You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by wy...@apache.org on 2023/03/09 01:35:35 UTC

[asterixdb] branch master updated: [ASTERIXDB-3129][STO][RT] Add columnn encoders/decoders

This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a0faa08ad [ASTERIXDB-3129][STO][RT] Add columnn encoders/decoders
1a0faa08ad is described below

commit 1a0faa08ad5445f968d669054f2d6481af5199bb
Author: Wail Alkowaileet <wa...@gmail.com>
AuthorDate: Wed Mar 8 08:18:44 2023 -0800

    [ASTERIXDB-3129][STO][RT] Add columnn encoders/decoders
    
    - user mode changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Add a modified version of Parquet's encoders/decoders
      that fits our needs and avoids object creation. Also,
      accepts Hyracks values (i.e., IValueReference)
    - Add column streams (in/out) for reading/writing
      encoded column values
    
    Interface changes:
    Add close() to the interface for IColumnTupleIterator
    to log the number of filtered pages.
    
    Change-Id: Ib185ba5da37b4c88523a028e7cc4108aefc0145a
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17413
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wail Alkowaileet <wa...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 asterixdb/asterix-column/pom.xml                   | 151 ++++++++++
 .../bytes/ParquetDeltaBinaryPackingConfig.java     |  75 +++++
 .../bytes/decoder/AbstractParquetValuesReader.java |  50 ++++
 .../ParquetDeltaBinaryPackingValuesReader.java     | 216 +++++++++++++++
 .../bytes/decoder/ParquetDeltaByteArrayReader.java | 112 ++++++++
 .../ParquetDeltaLengthByteArrayValuesReader.java   |  68 +++++
 .../decoder/ParquetDoublePlainValuesReader.java    |  52 ++++
 .../ParquetRunLengthBitPackingHybridDecoder.java   | 146 ++++++++++
 ...tractParquetDeltaBinaryPackingValuesWriter.java | 119 ++++++++
 .../bytes/encoder/AbstractParquetValuesWriter.java |  87 ++++++
 ...etDeltaBinaryPackingValuesWriterForInteger.java | 233 ++++++++++++++++
 ...rquetDeltaBinaryPackingValuesWriterForLong.java | 234 ++++++++++++++++
 .../bytes/encoder/ParquetDeltaByteArrayWriter.java | 114 ++++++++
 .../ParquetDeltaLengthByteArrayValuesWriter.java   |  91 +++++++
 .../bytes/encoder/ParquetPlainValuesWriter.java    |  87 ++++++
 .../ParquetRunLengthBitPackingHybridEncoder.java   | 263 ++++++++++++++++++
 .../bytes/stream/in/AbstractBytesInputStream.java  |  75 +++++
 .../bytes/stream/in/ByteBufferInputStream.java     | 169 ++++++++++++
 .../stream/in/MultiByteBufferInputStream.java      | 303 +++++++++++++++++++++
 .../stream/out/AbstractBytesOutputStream.java      | 101 +++++++
 .../out/AbstractMultiBufferBytesOutputStream.java  | 164 +++++++++++
 .../bytes/stream/out/ByteBufferOutputStream.java   |  47 ++++
 .../stream/out/GrowableBytesOutputStream.java      |  86 ++++++
 .../MultiPersistentBufferBytesOutputStream.java    |  52 ++++
 .../out/MultiTemporaryBufferBytesOutputStream.java |  57 ++++
 .../column/bytes/stream/out/ParquetBytesInput.java |  48 ++++
 .../out/pointer/ByteBufferReservedPointer.java     |  51 ++++
 .../stream/out/pointer/GrowableBytesPointer.java   |  55 ++++
 .../bytes/stream/out/pointer/IReservedPointer.java |  50 ++++
 asterixdb/pom.xml                                  |  28 ++
 .../data/std/util/ArrayBackedValueStorage.java     |   6 +-
 .../hyracks-storage-am-lsm-btree-column/pom.xml    |   4 +
 .../lsm/btree/column/api/IColumnTupleIterator.java |   2 +
 .../impls/btree/ColumnBTreeRangeSearchCursor.java  |   1 +
 .../lsm/tuples/AbstractColumnTupleReference.java   |  17 ++
 .../apache/hyracks/util/string/UTF8StringUtil.java |   4 +
 36 files changed, 3417 insertions(+), 1 deletion(-)

diff --git a/asterixdb/asterix-column/pom.xml b/asterixdb/asterix-column/pom.xml
new file mode 100644
index 0000000000..2ee75c84a7
--- /dev/null
+++ b/asterixdb/asterix-column/pom.xml
@@ -0,0 +1,151 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>apache-asterixdb</artifactId>
+    <groupId>org.apache.asterix</groupId>
+    <version>0.9.8-SNAPSHOT</version>
+  </parent>
+  <artifactId>asterix-column</artifactId>
+
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+      <comments>A business-friendly OSS license</comments>
+    </license>
+  </licenses>
+
+  <properties>
+    <root.dir>${basedir}/..</root.dir>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>default</id>
+            <phase>validate</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+            <configuration>
+              <licenses>
+                <license implementation="org.apache.rat.analysis.license.ApacheSoftwareLicense20"/>
+              </licenses>
+              <excludes combine.children="append">
+                <exclude>src/test/resources/result/**</exclude>
+              </excludes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-common</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-om</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-runtime</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.asterix</groupId>
+      <artifactId>asterix-external-data</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-util</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-data-std</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-dataflow-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-am-lsm-btree-column</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>algebricks-data</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-am-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hyracks</groupId>
+      <artifactId>hyracks-storage-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-column</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-encoding</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/ParquetDeltaBinaryPackingConfig.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/ParquetDeltaBinaryPackingConfig.java
new file mode 100644
index 0000000000..f591d575fc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/ParquetDeltaBinaryPackingConfig.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+
+/**
+ * Copy of {@link org.apache.parquet.column.values.delta.DeltaBinaryPackingConfig}
+ */
+public class ParquetDeltaBinaryPackingConfig {
+    private int blockSizeInValues;
+    private int miniBlockNumInABlock;
+    private int miniBlockSizeInValues;
+
+    public ParquetDeltaBinaryPackingConfig(int blockSizeInValues, int miniBlockNumInABlock) {
+        reset(blockSizeInValues, miniBlockNumInABlock);
+    }
+
+    private void reset(int blockSizeInValues, int miniBlockNumInABlock) {
+        this.blockSizeInValues = blockSizeInValues;
+        this.miniBlockNumInABlock = miniBlockNumInABlock;
+        double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+        Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+        this.miniBlockSizeInValues = (int) miniSize;
+    }
+
+    public static ParquetDeltaBinaryPackingConfig readConfig(InputStream in, ParquetDeltaBinaryPackingConfig config)
+            throws IOException {
+        final int blockSizeInValues = BytesUtils.readUnsignedVarInt(in);
+        final int miniBlockNumInABlock = BytesUtils.readUnsignedVarInt(in);
+        if (config == null) {
+            return new ParquetDeltaBinaryPackingConfig(blockSizeInValues, miniBlockNumInABlock);
+        }
+        config.reset(blockSizeInValues, miniBlockNumInABlock);
+        return config;
+    }
+
+    public BytesInput toBytesInput() {
+        return BytesInput.concat(BytesInput.fromUnsignedVarInt(blockSizeInValues),
+                BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+    }
+
+    public int getBlockSizeInValues() {
+        return blockSizeInValues;
+    }
+
+    public int getMiniBlockNumInABlock() {
+        return miniBlockNumInABlock;
+    }
+
+    public int getMiniBlockSizeInValues() {
+        return miniBlockSizeInValues;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java
new file mode 100644
index 0000000000..5f5b88caf9
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/AbstractParquetValuesReader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.column.values.ValuesReader;
+
+/**
+ * Replaces {@link ValuesReader}
+ */
+public abstract class AbstractParquetValuesReader {
+    public abstract void initFromPage(AbstractBytesInputStream stream) throws IOException;
+
+    public abstract void skip();
+
+    public int readInteger() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public long readLong() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public double readDouble() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public IValueReference readBytes() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaBinaryPackingValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaBinaryPackingValuesReader.java
new file mode 100644
index 0000000000..9aafa0f1b6
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaBinaryPackingValuesReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.asterix.column.bytes.ParquetDeltaBinaryPackingConfig;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesReader}
+ */
+public class ParquetDeltaBinaryPackingValuesReader extends AbstractParquetValuesReader {
+    private int totalValueCount;
+    /**
+     * values read by the caller
+     */
+    private int valuesBufferedRead;
+    private int valuesRead;
+
+    /**
+     * stores the decoded values including the first value which is written to the header
+     */
+    private long[] valuesBuffer;
+    /**
+     * values loaded to the buffer, it could be bigger than the totalValueCount
+     * when data is not aligned to mini block, which means padding 0s are in the buffer
+     */
+    private int valuesBuffered;
+    private AbstractBytesInputStream in;
+    private ParquetDeltaBinaryPackingConfig config;
+    private int[] bitWidths;
+    private ByteBuffer bitWidthBuffer;
+    private long lastElement;
+
+    /**
+     * Loads one block at a time instead of eagerly loading all blocks in {@link DeltaBinaryPackingValuesReader}.
+     * This is to fix the {@link #valuesBuffer} size
+     */
+    @Override
+    public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+        this.in = stream;
+        this.config = ParquetDeltaBinaryPackingConfig.readConfig(in, this.config);
+        this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
+        allocateValuesBuffer();
+        bitWidths = allocate(bitWidths, config.getMiniBlockNumInABlock());
+        valuesBuffered = 0;
+
+        valuesBufferedRead = 0;
+        valuesRead = 0;
+
+        //read first value from header
+        valuesBuffer[valuesBuffered++] = BytesUtils.readZigZagVarLong(in);
+        lastElement = valuesBuffer[0];
+
+        if (valuesBuffered < totalValueCount) {
+            loadNewBlockToBuffer();
+        }
+    }
+
+    /**
+     * the value buffer is allocated so that the size of it is multiple of mini block
+     * because when writing, data is flushed on a mini block basis
+     */
+    private void allocateValuesBuffer() {
+        //+ 1 because first value written to header is also stored in values buffer
+        final int bufferSize = config.getMiniBlockSizeInValues() * config.getMiniBlockNumInABlock() + 1;
+        if (valuesBuffer == null || valuesBuffer.length < bufferSize) {
+            valuesBuffer = new long[bufferSize];
+        } else {
+            Arrays.fill(valuesBuffer, 0);
+        }
+    }
+
+    private int[] allocate(int[] array, int size) {
+        if (array == null || array.length < size) {
+            return new int[size];
+        }
+        return array;
+    }
+
+    @Override
+    public void skip() {
+        checkRead();
+        valuesRead++;
+    }
+
+    @Override
+    public int readInteger() {
+        // TODO: probably implement it separately
+        return (int) readLong();
+    }
+
+    @Override
+    public long readLong() {
+        checkRead();
+        valuesRead++;
+        return valuesBuffer[valuesBufferedRead++];
+    }
+
+    private void checkRead() {
+        if (valuesRead >= totalValueCount) {
+            throw new ParquetDecodingException("no more value to read, total value count is " + totalValueCount);
+        }
+        if (valuesBufferedRead >= valuesBuffered) {
+            //Set the last value buffered as the first
+            lastElement = valuesBuffer[valuesBufferedRead - 1];
+            valuesBufferedRead = 0;
+            valuesBuffered = 0;
+            Arrays.fill(valuesBuffer, 0);
+            try {
+                loadNewBlockToBuffer();
+            } catch (IOException e) {
+                throw new ParquetDecodingException("can not load next block", e);
+            }
+
+        }
+    }
+
+    private void loadNewBlockToBuffer() throws IOException {
+        long minDeltaInCurrentBlock;
+        try {
+            minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
+        } catch (IOException e) {
+            throw new ParquetDecodingException("can not read min delta in current block", e);
+        }
+
+        readBitWidthsForMiniBlocks();
+
+        // mini block is atomic for reading, we read a mini block when there are more values left
+        int i;
+        for (i = 0; i < config.getMiniBlockNumInABlock() && valuesRead + valuesBuffered < totalValueCount; i++) {
+            BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidths[i]);
+            unpackMiniBlock(packer);
+        }
+
+        //calculate values from deltas unpacked for current block
+        int valueUnpacked = i * config.getMiniBlockSizeInValues();
+        long prev = lastElement;
+        for (int j = valuesBuffered - valueUnpacked; j < valuesBuffered; j++) {
+            valuesBuffer[j] += minDeltaInCurrentBlock + prev;
+            prev = valuesBuffer[j];
+        }
+    }
+
+    /**
+     * mini block has a size of 8*n, unpack 8 value each time
+     *
+     * @param packer the packer created from bitwidth of current mini block
+     */
+    private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
+        for (int j = 0; j < config.getMiniBlockSizeInValues(); j += 8) {
+            unpack8Values(packer);
+        }
+    }
+
+    private void unpack8Values(BytePackerForLong packer) throws IOException {
+        // get a single buffer of 8 values. most of the time, this won't require a copy
+        ByteBuffer buffer = readBitWidth(packer.getBitWidth());
+        packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered);
+        this.valuesBuffered += 8;
+    }
+
+    private void readBitWidthsForMiniBlocks() {
+        for (int i = 0; i < config.getMiniBlockNumInABlock(); i++) {
+            try {
+                bitWidths[i] = BytesUtils.readIntLittleEndianOnOneByte(in);
+            } catch (IOException e) {
+                throw new ParquetDecodingException("Can not decode bit width in block header", e);
+            }
+        }
+    }
+
+    private ByteBuffer prepareBitWidthBuffer(int length) {
+        if (bitWidthBuffer == null || bitWidthBuffer.capacity() < length) {
+            bitWidthBuffer = ByteBuffer.allocate(length);
+        }
+        bitWidthBuffer.clear();
+        bitWidthBuffer.limit(length);
+        return bitWidthBuffer;
+    }
+
+    private ByteBuffer readBitWidth(int length) throws IOException {
+        ByteBuffer buffer = prepareBitWidthBuffer(length);
+        int read = in.read(buffer);
+        if (read != length) {
+            throw new EOFException("Reached end of stream");
+        }
+        buffer.position(0);
+        return buffer;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaByteArrayReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaByteArrayReader.java
new file mode 100644
index 0000000000..70c25b85c0
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaByteArrayReader.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Re-implementation of {@link DeltaByteArrayReader}
+ */
+public class ParquetDeltaByteArrayReader extends AbstractParquetValuesReader {
+    private final AbstractParquetValuesReader prefixLengthReader;
+    private final ParquetDeltaLengthByteArrayValuesReader suffixReader;
+    private final byte[] lengthBytes;
+
+    private final ArrayBackedValueStorage temp;
+    private final ArrayBackedValueStorage previous;
+    boolean newPage;
+
+    public ParquetDeltaByteArrayReader(boolean containsLength) {
+        this.prefixLengthReader = new ParquetDeltaBinaryPackingValuesReader();
+        this.suffixReader = new ParquetDeltaLengthByteArrayValuesReader();
+        this.temp = new ArrayBackedValueStorage();
+        this.previous = new ArrayBackedValueStorage();
+        lengthBytes = containsLength ? new byte[4] : new byte[0];
+    }
+
+    @Override
+    public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+        AbstractBytesInputStream prefixStream = stream.sliceStream(BytesUtils.readUnsignedVarInt(stream));
+        prefixLengthReader.initFromPage(prefixStream);
+        suffixReader.initFromPage(stream);
+        previous.reset();
+        temp.reset();
+        newPage = true;
+    }
+
+    @Override
+    public void skip() {
+        // read the next value to skip so that previous is correct.
+        this.readBytes();
+    }
+
+    @Override
+    public IValueReference readBytes() {
+        int prefixLength = prefixLengthReader.readInteger();
+        // This does not copy bytes
+        IValueReference suffix = suffixReader.readBytes();
+
+        // NOTE: due to PARQUET-246, it is important that we
+        // respect prefixLength which was read from prefixLengthReader,
+        // even for the *first* value of a page. Even though the first
+        // value of the page should have an empty prefix, it may not
+        // because of PARQUET-246.
+
+        // We have to do this to materialize the output
+        try {
+            int lengthSize;
+            if (prefixLength != 0) {
+                lengthSize = appendLength(prefixLength + suffix.getLength());
+                temp.append(previous.getByteArray(), previous.getStartOffset(), prefixLength);
+            } else {
+                lengthSize = appendLength(suffix.getLength());
+            }
+            temp.append(suffix);
+            /*
+             * Adding length after appending prefix and suffix is important as we do not overwrite the original
+             * previous bytes
+             * */
+            System.arraycopy(lengthBytes, 0, temp.getByteArray(), 0, lengthSize);
+            previous.set(temp.getByteArray(), temp.getStartOffset() + lengthSize, temp.getLength() - lengthSize);
+        } catch (IOException e) {
+            throw new ParquetDecodingException(e);
+        }
+        newPage = false;
+        return temp;
+    }
+
+    private int appendLength(int length) {
+        if (lengthBytes.length > 0) {
+            int numOfBytes = UTF8StringUtil.encodeUTF8Length(length, lengthBytes, 0);
+            temp.setSize(numOfBytes);
+            return numOfBytes;
+        }
+        temp.setSize(0);
+        return 0;
+    }
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaLengthByteArrayValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaLengthByteArrayValuesReader.java
new file mode 100644
index 0000000000..9913269de9
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDeltaLengthByteArrayValuesReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class ParquetDeltaLengthByteArrayValuesReader extends AbstractParquetValuesReader {
+
+    private final VoidPointable value;
+    private final AbstractParquetValuesReader lengthReader;
+    private AbstractBytesInputStream in;
+
+    public ParquetDeltaLengthByteArrayValuesReader() {
+        this.lengthReader = new ParquetDeltaBinaryPackingValuesReader();
+        value = new VoidPointable();
+    }
+
+    @Override
+    public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+        AbstractBytesInputStream lengthStream = stream.sliceStream(BytesUtils.readUnsignedVarInt(stream));
+        lengthReader.initFromPage(lengthStream);
+        this.in = stream;
+    }
+
+    @Override
+    public void skip() {
+        int length = lengthReader.readInteger();
+        try {
+            in.skipFully(length);
+        } catch (IOException e) {
+            throw new ParquetDecodingException("Failed to skip " + length + " bytes");
+        }
+    }
+
+    @Override
+    public IValueReference readBytes() {
+        int length = lengthReader.readInteger();
+        try {
+            in.read(value, length);
+            return value;
+        } catch (IOException e) {
+            throw new ParquetDecodingException("Failed to read " + length + " bytes");
+        }
+    }
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
new file mode 100644
index 0000000000..196bec29b8
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.parquet.bytes.LittleEndianDataInputStream;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class ParquetDoublePlainValuesReader extends AbstractParquetValuesReader {
+    private LittleEndianDataInputStream in;
+
+    @Override
+    public void initFromPage(AbstractBytesInputStream stream) throws IOException {
+        this.in = new LittleEndianDataInputStream(stream.remainingStream());
+    }
+
+    @Override
+    public void skip() {
+        try {
+            in.skipBytes(8);
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not skip double", e);
+        }
+    }
+
+    @Override
+    public double readDouble() {
+        try {
+            return in.readDouble();
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not read double", e);
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetRunLengthBitPackingHybridDecoder.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetRunLengthBitPackingHybridDecoder.java
new file mode 100644
index 0000000000..4607dc2c29
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetRunLengthBitPackingHybridDecoder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.decoder;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+
+/**
+ * Re-implementation of {@link RunLengthBitPackingHybridDecoder}
+ */
+public class ParquetRunLengthBitPackingHybridDecoder {
+    private enum MODE {
+        RLE,
+        PACKED
+    }
+
+    private final int bitWidth;
+    private final BytePacker packer;
+    private InputStream in;
+
+    private MODE mode;
+    private int currentCount;
+    private int currentValue;
+    private int currentBufferLength;
+    private int[] currentBuffer;
+    private byte[] bytes;
+
+    public ParquetRunLengthBitPackingHybridDecoder(int bitWidth) {
+        Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+        this.bitWidth = bitWidth;
+        this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+    }
+
+    public void reset(InputStream in) {
+        this.in = in;
+        currentCount = 0;
+        currentBufferLength = 0;
+    }
+
+    public int readInt() throws HyracksDataException {
+        try {
+            return nextInt();
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private int nextInt() throws IOException {
+        if (currentCount == 0) {
+            readNext();
+        }
+        --currentCount;
+        int result;
+        switch (mode) {
+            case RLE:
+                result = currentValue;
+                break;
+            case PACKED:
+                result = currentBuffer[currentBufferLength - 1 - currentCount];
+                break;
+            default:
+                throw new ParquetDecodingException("not a valid mode " + mode);
+        }
+        return result;
+    }
+
+    private void readNext() throws IOException {
+        Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream.");
+        final int header = BytesUtils.readUnsignedVarInt(in);
+        mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+        switch (mode) {
+            case RLE:
+                currentCount = header >>> 1;
+                currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth);
+                break;
+            case PACKED:
+                int numGroups = header >>> 1;
+                currentCount = numGroups * 8;
+                allocateBuffers(currentCount, numGroups * bitWidth);
+                // At the end of the file RLE data though, there might not be that many bytes left.
+                int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0);
+                bytesToRead = Math.min(bytesToRead, in.available());
+                readFully(bytes, bytesToRead);
+                for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex +=
+                        bitWidth) {
+                    packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
+                }
+                break;
+            default:
+                throw new ParquetDecodingException("not a valid mode " + mode);
+        }
+    }
+
+    private void allocateBuffers(int intBufferLength, int byteBufferLength) {
+        if (currentBuffer == null || currentBuffer.length < intBufferLength) {
+            currentBuffer = new int[intBufferLength];
+        } else {
+            Arrays.fill(currentBuffer, 0);
+        }
+        currentBufferLength = intBufferLength;
+
+        if (bytes == null || bytes.length < byteBufferLength) {
+            bytes = new byte[byteBufferLength];
+        } else {
+            Arrays.fill(bytes, (byte) 0);
+        }
+    }
+
+    private void readFully(byte[] b, int len) throws IOException {
+        if (len < 0)
+            throw new IndexOutOfBoundsException();
+        int n = 0;
+        while (n < len) {
+            int count = in.read(b, n, len - n);
+            if (count < 0)
+                throw new EOFException();
+            n += count;
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
new file mode 100644
index 0000000000..3102063137
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetDeltaBinaryPackingValuesWriter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.ParquetDeltaBinaryPackingConfig;
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesWriter}
+ */
+public abstract class AbstractParquetDeltaBinaryPackingValuesWriter extends AbstractParquetValuesWriter {
+
+    public static final int DEFAULT_NUM_BLOCK_VALUES = 128;
+
+    public static final int DEFAULT_NUM_MINIBLOCKS = 4;
+
+    protected final MultiTemporaryBufferBytesOutputStream outputStream;
+
+    /**
+     * stores blockSizeInValues, miniBlockNumInABlock and miniBlockSizeInValues
+     */
+    protected final ParquetDeltaBinaryPackingConfig config;
+
+    /**
+     * bit width for each mini block, reused between flushes
+     */
+    protected final int[] bitWidths;
+
+    protected int totalValueCount = 0;
+
+    /**
+     * a pointer to deltaBlockBuffer indicating the end of deltaBlockBuffer
+     * the number of values in the deltaBlockBuffer that haven't flushed to baos
+     * it will be reset after each flush
+     */
+    protected int deltaValuesToFlush = 0;
+
+    /**
+     * bytes buffer for a mini block, it is reused for each mini block.
+     * Therefore the size of biggest miniblock with bitwith of MAX_BITWITH is allocated
+     */
+    protected byte[] miniBlockByteBuffer;
+
+    /**
+     * Estimated element size after encoding
+     */
+    protected int estimatedElementSize = 0;
+    /**
+     * Estimated size for all non-flushed elements
+     */
+    protected int estimatedSize = 0;
+
+    protected AbstractParquetDeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        this.config = new ParquetDeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
+        bitWidths = new int[config.getMiniBlockNumInABlock()];
+        outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+    }
+
+    protected void writeBitWidthForMiniBlock(int i) {
+        try {
+            BytesUtils.writeIntLittleEndianOnOneByte(outputStream, bitWidths[i]);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("can not write bit width for mini-block", e);
+        }
+    }
+
+    protected int getMiniBlockCountToFlush(double numberCount) {
+        return (int) Math.ceil(numberCount / config.getMiniBlockSizeInValues());
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        this.totalValueCount = 0;
+        this.outputStream.reset();
+        this.deltaValuesToFlush = 0;
+    }
+
+    @Override
+    public void close() {
+        this.totalValueCount = 0;
+        this.deltaValuesToFlush = 0;
+        outputStream.finish();
+    }
+
+    @Override
+    public int getEstimatedSize() {
+        return outputStream.size() + estimatedSize;
+    }
+
+    @Override
+    public int getAllocatedSize() {
+        return outputStream.capacity();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
new file mode 100644
index 0000000000..b53ded20b2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/AbstractParquetValuesWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+
+/**
+ * Replaces {@link ValuesWriter}
+ */
+public abstract class AbstractParquetValuesWriter {
+
+    public abstract BytesInput getBytes();
+
+    /**
+     * called after getBytes() to reset the current buffer and start writing the next page
+     */
+    public abstract void reset() throws HyracksDataException;
+
+    /**
+     * Called to close the values writer. Any output stream is closed and can no longer be used.
+     * All resources are released.
+     */
+    public abstract void close();
+
+    public abstract int getEstimatedSize();
+
+    /**
+     * @return the allocated size of the buffer
+     */
+    public abstract int getAllocatedSize();
+
+    /**
+     * @param v the value to encode
+     */
+    public void writeBoolean(boolean v) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    /**
+     * @param v               the value to encode
+     * @param skipLengthBytes whether to skip the length bytes of {@link UTF8StringPointable} or not
+     */
+    public void writeBytes(IValueReference v, boolean skipLengthBytes) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    /**
+     * @param v the value to encode
+     */
+    public void writeInteger(int v) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    /**
+     * @param v the value to encode
+     */
+    public void writeLong(long v) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    /**
+     * @param v the value to encode
+     */
+    public void writeDouble(double v) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForInteger.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForInteger.java
new file mode 100644
index 0000000000..1c474fc21e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForInteger.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesWriterForInteger}
+ */
+public class ParquetDeltaBinaryPackingValuesWriterForInteger extends AbstractParquetDeltaBinaryPackingValuesWriter {
+    /**
+     * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+     * reused between flushes.
+     */
+    private static final int MAX_BITWIDTH = 32;
+
+    private final int blockSizeInValues;
+    private final int miniBlockNumInABlock;
+    private final int miniBlockSizeInValues;
+
+    /**
+     * stores delta values starting from the 2nd value written(1st value is stored in header).
+     * It's reused between flushes
+     */
+    private final int[] deltaBlockBuffer;
+
+    /**
+     * firstValue is written to the header of the page
+     */
+    private int firstValue = 0;
+
+    /**
+     * cache previous written value for calculating delta
+     */
+    private int previousValue = 0;
+
+    /**
+     * min delta is written to the beginning of each block.
+     * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+     * therefore are all positive
+     * it will be reset after each flush
+     */
+    private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
+    private int maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+    private int estimatedSize = 0;
+
+    public ParquetDeltaBinaryPackingValuesWriterForInteger(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, multiPageOpRef);
+    }
+
+    public ParquetDeltaBinaryPackingValuesWriterForInteger(int blockSizeInValues, int miniBlockNum,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        super(blockSizeInValues, miniBlockNum, multiPageOpRef);
+        this.blockSizeInValues = blockSizeInValues;
+        this.miniBlockNumInABlock = miniBlockNum;
+        double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+        Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+        this.miniBlockSizeInValues = (int) miniSize;
+
+        deltaBlockBuffer = new int[blockSizeInValues];
+        miniBlockByteBuffer = new byte[miniBlockSizeInValues * MAX_BITWIDTH];
+    }
+
+    @Override
+    public void writeInteger(int v) {
+        totalValueCount++;
+
+        if (totalValueCount == 1) {
+            firstValue = v;
+            previousValue = firstValue;
+            return;
+        }
+
+        // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+        // Java int is working as a modalar ring with base 2^32 and because of the plus and minus
+        // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+        int delta = v - previousValue;
+        previousValue = v;
+
+        deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+        if (delta < minDeltaInCurrentBlock) {
+            minDeltaInCurrentBlock = delta;
+        }
+
+        if (blockSizeInValues == deltaValuesToFlush) {
+            flushBlockBuffer();
+        } else {
+            //Recalibrate the estimated size
+            if (delta > maxDeltaInCurrentBlock) {
+                maxDeltaInCurrentBlock = delta;
+                estimatedElementSize =
+                        (64 - Long.numberOfLeadingZeros(maxDeltaInCurrentBlock - minDeltaInCurrentBlock));
+                estimatedSize = estimatedElementSize * deltaValuesToFlush;
+            } else {
+                estimatedSize += estimatedElementSize;
+            }
+        }
+    }
+
+    private void flushBlockBuffer() {
+        // since we store the min delta, the deltas will be converted to be the difference to min delta
+        // and all positive
+        for (int i = 0; i < deltaValuesToFlush; i++) {
+            deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+        }
+
+        writeMinDelta();
+        int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+        calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+        for (int i = 0; i < miniBlockNumInABlock; i++) {
+            writeBitWidthForMiniBlock(i);
+        }
+
+        for (int i = 0; i < miniBlocksToFlush; i++) {
+            // writing i th miniblock
+            int currentBitWidth = bitWidths[i];
+            int blockOffset = 0;
+            BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(currentBitWidth);
+            int miniBlockStart = i * miniBlockSizeInValues;
+            for (int j = miniBlockStart; j < (i + 1) * miniBlockSizeInValues; j += 8) {//8 values per pack
+                // mini block is atomic in terms of flushing
+                // This may write more values when reach to the end of data writing to last mini block,
+                // since it may not be aligned to miniblock,
+                // but doesn't matter. The reader uses total count to see if reached the end.
+                packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+                blockOffset += currentBitWidth;
+            }
+            try {
+                outputStream.write(miniBlockByteBuffer, 0, blockOffset);
+            } catch (IOException e) {
+                throw new ParquetEncodingException(e);
+            }
+        }
+
+        minDeltaInCurrentBlock = Integer.MAX_VALUE;
+        deltaValuesToFlush = 0;
+        estimatedSize = 0;
+        maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+    }
+
+    private void writeMinDelta() {
+        try {
+            BytesUtils.writeZigZagVarInt(minDeltaInCurrentBlock, outputStream);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("can not write min delta for block", e);
+        }
+    }
+
+    /**
+     * iterate through values in each mini block and calculate the bitWidths of max values.
+     *
+     * @param miniBlocksToFlush number of miniblocks
+     */
+    private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+        for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+            int mask = 0;
+            int miniStart = miniBlockIndex * miniBlockSizeInValues;
+
+            /*
+             * The end of current mini block could be the end of current block(deltaValuesToFlush) buffer
+             * when data is not aligned to mini block
+             */
+            int miniEnd = Math.min((miniBlockIndex + 1) * miniBlockSizeInValues, deltaValuesToFlush);
+
+            for (int i = miniStart; i < miniEnd; i++) {
+                mask |= deltaBlockBuffer[i];
+            }
+            bitWidths[miniBlockIndex] = 32 - Integer.numberOfLeadingZeros(mask);
+        }
+    }
+
+    /**
+     * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+     *
+     * @return a BytesInput that contains the encoded page data
+     */
+    @Override
+    public BytesInput getBytes() {
+        // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+        if (deltaValuesToFlush != 0) {
+            flushBlockBuffer();
+        }
+        BytesInput configBytes = BytesInput.concat(BytesInput.fromUnsignedVarInt(blockSizeInValues),
+                BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+        return BytesInput.concat(configBytes, BytesInput.fromUnsignedVarInt(totalValueCount),
+                BytesInput.fromZigZagVarInt(firstValue), outputStream.asBytesInput());
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        super.reset();
+        this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+        estimatedSize = 0;
+        maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+        estimatedSize = 0;
+        maxDeltaInCurrentBlock = Integer.MIN_VALUE;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForLong.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForLong.java
new file mode 100644
index 0000000000..6ba40c1cf2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaBinaryPackingValuesWriterForLong.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaBinaryPackingValuesWriterForLong}
+ */
+public class ParquetDeltaBinaryPackingValuesWriterForLong extends AbstractParquetDeltaBinaryPackingValuesWriter {
+    /**
+     * max bitwidth for a mini block, it is used to allocate miniBlockByteBuffer which is
+     * reused between flushes.
+     */
+    private static final int MAX_BITWIDTH = 64;
+
+    private final int blockSizeInValues;
+    private final int miniBlockNumInABlock;
+    private final int miniBlockSizeInValues;
+
+    /**
+     * stores delta values starting from the 2nd value written(1st value is stored in header).
+     * It's reused between flushes
+     */
+    private final long[] deltaBlockBuffer;
+
+    /**
+     * firstValue is written to the header of the page
+     */
+    private long firstValue = 0;
+
+    /**
+     * cache previous written value for calculating delta
+     */
+    private long previousValue = 0;
+
+    /**
+     * min delta is written to the beginning of each block.
+     * it's zig-zag encoded. The deltas stored in each block is actually the difference to min delta,
+     * therefore are all positive
+     * it will be reset after each flush
+     */
+    private long minDeltaInCurrentBlock = Long.MAX_VALUE;
+    private long maxDeltaInCurrentBlock = Long.MIN_VALUE;
+
+    public ParquetDeltaBinaryPackingValuesWriterForLong(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, multiPageOpRef);
+    }
+
+    public ParquetDeltaBinaryPackingValuesWriterForLong(int blockSizeInValues, int miniBlockNum,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        super(blockSizeInValues, miniBlockNum, multiPageOpRef);
+        this.blockSizeInValues = blockSizeInValues;
+        this.miniBlockNumInABlock = miniBlockNum;
+        double miniSize = (double) blockSizeInValues / miniBlockNumInABlock;
+        Preconditions.checkArgument(miniSize % 8 == 0, "miniBlockSize must be multiple of 8, but it's " + miniSize);
+        this.miniBlockSizeInValues = (int) miniSize;
+        deltaBlockBuffer = new long[blockSizeInValues];
+        miniBlockByteBuffer = new byte[miniBlockSizeInValues * MAX_BITWIDTH];
+    }
+
+    @Override
+    public void writeLong(long v) {
+        totalValueCount++;
+
+        if (totalValueCount == 1) {
+            firstValue = v;
+            previousValue = firstValue;
+            return;
+        }
+
+        // Calculate delta. The possible overflow is accounted for. The algorithm is correct because
+        // Java long is working as a modalar ring with base 2^64 and because of the plus and minus
+        // properties of a ring. http://en.wikipedia.org/wiki/Modular_arithmetic#Integers_modulo_n
+        long delta = v - previousValue;
+        previousValue = v;
+
+        deltaBlockBuffer[deltaValuesToFlush++] = delta;
+
+        if (delta < minDeltaInCurrentBlock) {
+            minDeltaInCurrentBlock = delta;
+        }
+
+        if (blockSizeInValues == deltaValuesToFlush) {
+            flushBlockBuffer();
+        } else {
+            //Recalibrate the estimated size
+            if (delta > maxDeltaInCurrentBlock) {
+                maxDeltaInCurrentBlock = delta;
+                estimatedElementSize =
+                        (64 - Long.numberOfLeadingZeros(maxDeltaInCurrentBlock - minDeltaInCurrentBlock));
+                estimatedSize = estimatedElementSize * deltaValuesToFlush;
+            } else {
+                estimatedSize += estimatedElementSize;
+            }
+        }
+    }
+
+    private void flushBlockBuffer() {
+        // since we store the min delta, the deltas will be converted to be the difference to min delta
+        // and all positive
+        for (int i = 0; i < deltaValuesToFlush; i++) {
+            deltaBlockBuffer[i] = deltaBlockBuffer[i] - minDeltaInCurrentBlock;
+        }
+
+        writeMinDelta();
+        int miniBlocksToFlush = getMiniBlockCountToFlush(deltaValuesToFlush);
+
+        calculateBitWidthsForDeltaBlockBuffer(miniBlocksToFlush);
+        int minBitWidth = Integer.MAX_VALUE;
+        for (int i = 0; i < miniBlockNumInABlock; i++) {
+            writeBitWidthForMiniBlock(i);
+            minBitWidth = Math.min(bitWidths[i], minBitWidth);
+        }
+
+        for (int i = 0; i < miniBlocksToFlush; i++) {
+            // writing i th miniblock
+            int currentBitWidth = bitWidths[i];
+            int blockOffset = 0;
+            // TODO: should this cache the packer?
+            BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(currentBitWidth);
+            int miniBlockStart = i * miniBlockSizeInValues;
+            // pack values into the miniblock buffer, 8 at a time to get exactly currentBitWidth bytes
+            for (int j = miniBlockStart; j < (i + 1) * miniBlockSizeInValues; j += 8) {
+                // mini block is atomic in terms of flushing
+                // This may write more values when reach to the end of data writing to last mini block,
+                // since it may not be aligned to miniblock,
+                // but doesn't matter. The reader uses total count to see if reached the end.
+                packer.pack8Values(deltaBlockBuffer, j, miniBlockByteBuffer, blockOffset);
+                blockOffset += currentBitWidth;
+            }
+            try {
+                outputStream.write(miniBlockByteBuffer, 0, blockOffset);
+            } catch (IOException e) {
+                throw new ParquetEncodingException(e);
+            }
+        }
+
+        minDeltaInCurrentBlock = Long.MAX_VALUE;
+        maxDeltaInCurrentBlock = Long.MIN_VALUE;
+        deltaValuesToFlush = 0;
+        estimatedElementSize = 0;
+        estimatedSize = 0;
+    }
+
+    private void writeMinDelta() {
+        try {
+            BytesUtils.writeZigZagVarLong(minDeltaInCurrentBlock, outputStream);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("can not write min delta for block", e);
+        }
+    }
+
+    /**
+     * iterate through values in each mini block and calculate the bitWidths of max values.
+     *
+     * @param miniBlocksToFlush number of miniblocks
+     */
+    private void calculateBitWidthsForDeltaBlockBuffer(int miniBlocksToFlush) {
+        for (int miniBlockIndex = 0; miniBlockIndex < miniBlocksToFlush; miniBlockIndex++) {
+            long mask = 0;
+            int miniStart = miniBlockIndex * miniBlockSizeInValues;
+
+            //The end of current mini block could be the end of current block(deltaValuesToFlush) buffer
+            //when data is not aligned to mini block
+            int miniEnd = Math.min((miniBlockIndex + 1) * miniBlockSizeInValues, deltaValuesToFlush);
+
+            for (int i = miniStart; i < miniEnd; i++) {
+                mask |= deltaBlockBuffer[i];
+            }
+            bitWidths[miniBlockIndex] = 64 - Long.numberOfLeadingZeros(mask);
+        }
+    }
+
+    /**
+     * getBytes will trigger flushing block buffer, DO NOT write after getBytes() is called without calling reset()
+     *
+     * @return a BytesInput that contains the encoded page data
+     */
+    @Override
+    public BytesInput getBytes() {
+        // The Page Header should include: blockSizeInValues, numberOfMiniBlocks, totalValueCount
+        if (deltaValuesToFlush != 0) {
+            flushBlockBuffer();
+        }
+        BytesInput configBytes = BytesInput.concat(BytesInput.fromUnsignedVarInt(blockSizeInValues),
+                BytesInput.fromUnsignedVarInt(miniBlockNumInABlock));
+        return BytesInput.concat(configBytes, BytesInput.fromUnsignedVarInt(totalValueCount),
+                BytesInput.fromZigZagVarLong(firstValue), outputStream.asBytesInput());
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        super.reset();
+        this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+        this.maxDeltaInCurrentBlock = Long.MIN_VALUE;
+        previousValue = 0;
+        estimatedElementSize = 0;
+        estimatedSize = 0;
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        this.minDeltaInCurrentBlock = Long.MAX_VALUE;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
new file mode 100644
index 0000000000..1b4611656a
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+
+/**
+ * Re-implementation of {@link DeltaByteArrayWriter}
+ */
+public class ParquetDeltaByteArrayWriter extends AbstractParquetValuesWriter {
+    private static final IValueReference EMPTY_VALUE;
+    private final ParquetDeltaBinaryPackingValuesWriterForInteger prefixLengthWriter;
+    private final ParquetDeltaLengthByteArrayValuesWriter suffixWriter;
+    private final VoidPointable suffix;
+    private final ArrayBackedValueStorage previous = new ArrayBackedValueStorage();
+
+    static {
+        VoidPointable emptyPointable = new VoidPointable();
+        emptyPointable.set(new byte[0], 0, 0);
+        EMPTY_VALUE = emptyPointable;
+    }
+
+    public ParquetDeltaByteArrayWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        this.prefixLengthWriter = new ParquetDeltaBinaryPackingValuesWriterForInteger(multiPageOpRef);
+        this.suffixWriter = new ParquetDeltaLengthByteArrayValuesWriter(multiPageOpRef);
+        suffix = new VoidPointable();
+        suffix.set(EMPTY_VALUE);
+    }
+
+    @Override
+    public BytesInput getBytes() {
+        BytesInput prefixBytes = prefixLengthWriter.getBytes();
+        BytesInput prefixLength = BytesInput.fromUnsignedVarInt((int) prefixBytes.size());
+        BytesInput suffixBytes = suffixWriter.getBytes();
+        return BytesInput.concat(prefixLength, prefixBytes, suffixBytes);
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        prefixLengthWriter.reset();
+        suffixWriter.reset();
+        previous.reset();
+        suffix.set(EMPTY_VALUE);
+    }
+
+    @Override
+    public void close() {
+        prefixLengthWriter.close();
+        suffixWriter.close();
+        previous.reset();
+        suffix.set(EMPTY_VALUE);
+    }
+
+    @Override
+    public int getEstimatedSize() {
+        return prefixLengthWriter.getEstimatedSize() + suffixWriter.getEstimatedSize();
+    }
+
+    @Override
+    public int getAllocatedSize() {
+        return prefixLengthWriter.getAllocatedSize() + suffixWriter.getAllocatedSize();
+    }
+
+    @Override
+    public void writeBytes(IValueReference value, boolean skipLengthBytes) {
+        byte[] bytes = value.getByteArray();
+        int start = value.getStartOffset();
+        int length = value.getLength();
+        if (skipLengthBytes) {
+            int lengthBytes = UTF8StringUtil.getNumBytesToStoreLength(bytes, start);
+            start += lengthBytes;
+            length -= lengthBytes;
+        }
+        writeBytes(bytes, start, length);
+    }
+
+    private void writeBytes(byte[] bytes, int offset, int length) {
+        final byte[] prevBytes = previous.getByteArray();
+        final int prevOffset = previous.getStartOffset();
+        final int minLength = Math.min(length, previous.getLength());
+        // find the number of matching prefix bytes between this value and the previous one
+        int i;
+        for (i = 0; (i < minLength) && (bytes[i + offset] == prevBytes[i + prevOffset]); i++);
+        prefixLengthWriter.writeInteger(i);
+        suffix.set(bytes, offset + i, length - i);
+        suffixWriter.writeBytes(suffix, false);
+        // We store as bytes could be evicted from the buffer cache
+        previous.set(bytes, offset, length);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
new file mode 100644
index 0000000000..afab48eb82
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaLengthByteArrayValuesWriter.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link DeltaLengthByteArrayValuesWriter}
+ */
+public class ParquetDeltaLengthByteArrayValuesWriter extends AbstractParquetValuesWriter {
+    private final ParquetDeltaBinaryPackingValuesWriterForInteger lengthWriter;
+    private final MultiTemporaryBufferBytesOutputStream outputStream;
+    private final LittleEndianDataOutputStream out;
+
+    public ParquetDeltaLengthByteArrayValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+        out = new LittleEndianDataOutputStream(outputStream);
+        lengthWriter = new ParquetDeltaBinaryPackingValuesWriterForInteger(multiPageOpRef);
+    }
+
+    @Override
+    public void writeBytes(IValueReference value, boolean skipLengthBytes) {
+        try {
+            lengthWriter.writeInteger(value.getLength());
+            out.write(value.getByteArray(), value.getStartOffset(), value.getLength());
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write bytes", e);
+        }
+    }
+
+    @Override
+    public BytesInput getBytes() {
+        try {
+            out.flush();
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write page", e);
+        }
+        BytesInput lengthBytes = lengthWriter.getBytes();
+        BytesInput lengthSize = BytesInput.fromUnsignedVarInt((int) lengthBytes.size());
+        BytesInput arrayBytes = outputStream.asBytesInput();
+        return BytesInput.concat(lengthSize, lengthBytes, arrayBytes);
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        lengthWriter.reset();
+        outputStream.reset();
+    }
+
+    @Override
+    public void close() {
+        lengthWriter.close();
+        outputStream.finish();
+    }
+
+    @Override
+    public int getEstimatedSize() {
+        return lengthWriter.getEstimatedSize() + outputStream.size();
+    }
+
+    @Override
+    public int getAllocatedSize() {
+        return lengthWriter.getAllocatedSize() + outputStream.capacity();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
new file mode 100644
index 0000000000..0298e596b0
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.LittleEndianDataOutputStream;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.io.ParquetEncodingException;
+
+/**
+ * Re-implementation of {@link PlainValuesWriter}
+ */
+public class ParquetPlainValuesWriter extends AbstractParquetValuesWriter {
+    public static final Charset CHARSET = StandardCharsets.UTF_8;
+
+    private final AbstractBytesOutputStream outputStream;
+    private final LittleEndianDataOutputStream out;
+
+    public ParquetPlainValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+        out = new LittleEndianDataOutputStream(outputStream);
+    }
+
+    @Override
+    public final void writeDouble(double v) {
+        try {
+            out.writeDouble(v);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write double", e);
+        }
+    }
+
+    @Override
+    public BytesInput getBytes() {
+        try {
+            out.flush();
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write page", e);
+        }
+        return outputStream.asBytesInput();
+    }
+
+    @Override
+    public void reset() throws HyracksDataException {
+        outputStream.reset();
+    }
+
+    @Override
+    public void close() {
+        outputStream.finish();
+    }
+
+    @Override
+    public int getEstimatedSize() {
+        return outputStream.size();
+    }
+
+    @Override
+    public int getAllocatedSize() {
+        return outputStream.capacity();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetRunLengthBitPackingHybridEncoder.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetRunLengthBitPackingHybridEncoder.java
new file mode 100644
index 0000000000..671e0a180a
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetRunLengthBitPackingHybridEncoder.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.encoder;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.out.GrowableBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
+
+/**
+ * Re-implementation of {@link RunLengthBitPackingHybridEncoder}
+ */
+public class ParquetRunLengthBitPackingHybridEncoder {
+    private final BytePacker packer;
+
+    private final GrowableBytesOutputStream outputStream;
+
+    /**
+     * The bit width used for bit-packing and for writing
+     * the repeated-value
+     */
+    private final int bitWidth;
+
+    /**
+     * Values that are bit packed 8 at at a time are packed into this
+     * buffer, which is then written to baos
+     */
+    private final byte[] packBuffer;
+
+    /**
+     * Previous value written, used to detect repeated values
+     */
+    private int previousValue;
+
+    /**
+     * We buffer 8 values at a time, and either bit pack them
+     * or discard them after writing a rle-run
+     */
+    private final int[] bufferedValues;
+    private int numBufferedValues;
+
+    /**
+     * How many times a value has been repeated
+     */
+    private int repeatCount;
+
+    /**
+     * How many groups of 8 values have been written
+     * to the current bit-packed-run
+     */
+    private int bitPackedGroupCount;
+
+    /**
+     * A "pointer" to a single byte in baos,
+     * which we use as our bit-packed-header. It's really
+     * the logical index of the byte in baos.
+     * <p>
+     * We are only using one byte for this header,
+     * which limits us to writing 504 values per bit-packed-run.
+     * <p>
+     * MSB must be 0 for varint encoding, LSB must be 1 to signify
+     * that this is a bit-packed-header leaves 6 bits to write the
+     * number of 8-groups -> (2^6 - 1) * 8 = 504
+     */
+    private final IReservedPointer bitPackedRunHeaderPointer;
+
+    private boolean toBytesCalled;
+
+    public ParquetRunLengthBitPackingHybridEncoder(int bitWidth) {
+
+        Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+
+        this.bitWidth = bitWidth;
+        this.outputStream = new GrowableBytesOutputStream();
+        this.bitPackedRunHeaderPointer = outputStream.createPointer();
+        this.packBuffer = new byte[bitWidth];
+        this.bufferedValues = new int[8];
+        this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+        reset(false);
+    }
+
+    private void reset(boolean resetBaos) {
+        if (resetBaos) {
+            this.outputStream.reset();
+        }
+        this.previousValue = 0;
+        this.numBufferedValues = 0;
+        this.repeatCount = 0;
+        this.bitPackedGroupCount = 0;
+        this.bitPackedRunHeaderPointer.reset();
+        this.toBytesCalled = false;
+    }
+
+    public void writeInt(int value) throws IOException {
+        if (value == previousValue) {
+            // keep track of how many times we've seen this value
+            // consecutively
+            ++repeatCount;
+
+            if (repeatCount >= 8) {
+                // we've seen this at least 8 times, we're
+                // certainly going to write an rle-run,
+                // so just keep on counting repeats for now
+                return;
+            }
+        } else {
+            // This is a new value, check if it signals the end of
+            // an rle-run
+            if (repeatCount >= 8) {
+                // it does! write an rle-run
+                writeRleRun();
+            }
+
+            // this is a new value so we've only seen it once
+            repeatCount = 1;
+            // start tracking this value for repeats
+            previousValue = value;
+        }
+
+        // We have not seen enough repeats to justify an rle-run yet,
+        // so buffer this value in case we decide to write a bit-packed-run
+        bufferedValues[numBufferedValues] = value;
+        ++numBufferedValues;
+
+        if (numBufferedValues == 8) {
+            // we've encountered less than 8 repeated values, so
+            // either start a new bit-packed-run or append to the
+            // current bit-packed-run
+            writeOrAppendBitPackedRun();
+        }
+    }
+
+    private void writeOrAppendBitPackedRun() throws IOException {
+        if (bitPackedGroupCount >= 63) {
+            // we've packed as many values as we can for this run,
+            // end it and start a new one
+            endPreviousBitPackedRun();
+        }
+
+        if (!bitPackedRunHeaderPointer.isSet()) {
+            // this is a new bit-packed-run, allocate a byte for the header
+            // and keep a "pointer" to it so that it can be mutated later
+            outputStream.reserveByte(bitPackedRunHeaderPointer);
+        }
+
+        packer.pack8Values(bufferedValues, 0, packBuffer, 0);
+        outputStream.write(packBuffer);
+
+        // empty the buffer, they've all been written
+        numBufferedValues = 0;
+
+        // clear the repeat count, as some repeated values
+        // may have just been bit packed into this run
+        repeatCount = 0;
+
+        ++bitPackedGroupCount;
+    }
+
+    /**
+     * If we are currently writing a bit-packed-run, update the
+     * bit-packed-header and consider this run to be over
+     * <p>
+     * does nothing if we're not currently writing a bit-packed run
+     */
+    private void endPreviousBitPackedRun() {
+        if (!bitPackedRunHeaderPointer.isSet()) {
+            // we're not currently in a bit-packed-run
+            return;
+        }
+
+        // create bit-packed-header, which needs to fit in 1 byte
+        byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1);
+
+        // update this byte
+        bitPackedRunHeaderPointer.setByte(bitPackHeader);
+
+        // mark that this run is over
+        bitPackedRunHeaderPointer.reset();
+
+        // reset the number of groups
+        bitPackedGroupCount = 0;
+    }
+
+    private void writeRleRun() throws IOException {
+        // we may have been working on a bit-packed-run
+        // so close that run if it exists before writing this
+        // rle-run
+        endPreviousBitPackedRun();
+
+        // write the rle-header (lsb of 0 signifies a rle run)
+        BytesUtils.writeUnsignedVarInt(repeatCount << 1, outputStream);
+        // write the repeated-value
+        BytesUtils.writeIntLittleEndianPaddedOnBitWidth(outputStream, previousValue, bitWidth);
+
+        // reset the repeat count
+        repeatCount = 0;
+
+        // throw away all the buffered values, they were just repeats and they've been written
+        numBufferedValues = 0;
+    }
+
+    public BytesInput toBytes() throws IOException {
+        Preconditions.checkArgument(!toBytesCalled, "You cannot call toBytes() more than once without calling reset()");
+
+        // write anything that is buffered / queued up for an rle-run
+        if (repeatCount >= 8) {
+            writeRleRun();
+        } else if (numBufferedValues > 0) {
+            for (int i = numBufferedValues; i < 8; i++) {
+                bufferedValues[i] = 0;
+            }
+            writeOrAppendBitPackedRun();
+            endPreviousBitPackedRun();
+        } else {
+            endPreviousBitPackedRun();
+        }
+
+        toBytesCalled = true;
+        return outputStream.asBytesInput();
+    }
+
+    /**
+     * Reset this encoder for re-use
+     */
+    public void reset() {
+        reset(true);
+    }
+
+    public void close() {
+        reset(false);
+        outputStream.finish();
+    }
+
+    public int getEstimatedSize() {
+        return outputStream.size() + repeatCount * bitWidth;
+    }
+
+    public int getAllocatedSize() {
+        return outputStream.capacity();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
new file mode 100644
index 0000000000..b50143bb7f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+
+public abstract class AbstractBytesInputStream extends InputStream {
+
+    public abstract void resetAt(int bytesToSkip, AbstractBytesInputStream stream) throws IOException;
+
+    protected abstract void addBuffer(ByteBuffer buffer);
+
+    public abstract void read(IPointable pointable, int length) throws EOFException;
+
+    @Override
+    public abstract int read() throws IOException;
+
+    @Override
+    public abstract int read(byte[] bytes, int offset, int length) throws IOException;
+
+    @Override
+    public abstract long skip(long n);
+
+    public abstract int read(ByteBuffer out);
+
+    public abstract AbstractBytesInputStream remainingStream() throws EOFException;
+
+    public abstract AbstractBytesInputStream sliceStream(int length) throws EOFException;
+
+    @Override
+    public abstract void mark(int readLimit);
+
+    @Override
+    public abstract void reset() throws IOException;
+
+    public abstract void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException;
+
+    @Override
+    public abstract int available();
+
+    public final void skipFully(long n) throws IOException {
+        long skipped = skip(n);
+        if (skipped < n) {
+            throw new EOFException("Not enough bytes to skip: " + skipped + " < " + n);
+        }
+    }
+
+    @Override
+    public final boolean markSupported() {
+        return true;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
new file mode 100644
index 0000000000..833765c316
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+
+public final class ByteBufferInputStream extends AbstractBytesInputStream {
+    private ByteBuffer buffer;
+    private int mark = -1;
+
+    @Override
+    public void reset(IColumnBufferProvider bufferProvider) {
+        addBuffer(bufferProvider.getBuffer());
+    }
+
+    @Override
+    protected void addBuffer(ByteBuffer buffer) {
+        this.buffer = buffer;
+        mark = -1;
+    }
+
+    @Override
+    public void resetAt(int bytesToSkip, AbstractBytesInputStream stream) throws IOException {
+        ByteBufferInputStream in = (ByteBufferInputStream) stream;
+        buffer = in.buffer.duplicate();
+        buffer.position(buffer.position() + bytesToSkip);
+        mark = -1;
+    }
+
+    @Override
+    public void read(IPointable pointable, int length) throws EOFException {
+        if (buffer.remaining() < length) {
+            throw new EOFException();
+        }
+
+        pointable.set(buffer.array(), buffer.position(), length);
+        buffer.position(buffer.position() + length);
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buffer.hasRemaining()) {
+            throw new EOFException();
+        }
+        return buffer.get() & 0xFF; // as unsigned
+    }
+
+    @Override
+    public int read(byte[] bytes, int offset, int length) throws IOException {
+        if (length == 0) {
+            return 0;
+        }
+
+        int remaining = buffer.remaining();
+        if (remaining <= 0) {
+            return -1;
+        }
+
+        int bytesToRead = Math.min(remaining, length);
+        buffer.get(bytes, offset, bytesToRead);
+
+        return bytesToRead;
+    }
+
+    @Override
+    public long skip(long n) {
+        if (n == 0) {
+            return 0;
+        }
+
+        if (!buffer.hasRemaining()) {
+            return -1;
+        }
+
+        // buffer.remaining is an int, so this will always fit in an int
+        int bytesToSkip = (int) Math.min(buffer.remaining(), n);
+        buffer.position(buffer.position() + bytesToSkip);
+
+        return bytesToSkip;
+    }
+
+    @Override
+    public int read(ByteBuffer out) {
+        int bytesToCopy;
+        ByteBuffer copyBuffer;
+        if (buffer.remaining() <= out.remaining()) {
+            // copy the entire buffer
+            bytesToCopy = buffer.remaining();
+            copyBuffer = buffer;
+        } else {
+            // copy a slice of the current buffer
+            bytesToCopy = out.remaining();
+            copyBuffer = buffer.duplicate();
+            copyBuffer.position(buffer.position());
+            copyBuffer.limit(buffer.position() + bytesToCopy);
+            buffer.position(buffer.position() + bytesToCopy);
+        }
+
+        out.put(copyBuffer);
+        out.flip();
+
+        return bytesToCopy;
+    }
+
+    @Override
+    public AbstractBytesInputStream sliceStream(int length) throws EOFException {
+        if (buffer.remaining() < length) {
+            throw new EOFException();
+        }
+        ByteBuffer copy = buffer.duplicate();
+        copy.position(buffer.position());
+        copy.limit(buffer.position() + length);
+        ByteBufferInputStream in = new ByteBufferInputStream();
+        in.addBuffer(copy);
+        buffer.position(buffer.position() + length);
+        return in;
+    }
+
+    @Override
+    public AbstractBytesInputStream remainingStream() {
+        ByteBuffer remaining = buffer.duplicate();
+        remaining.position(buffer.position());
+        buffer.position(buffer.limit());
+        ByteBufferInputStream in = new ByteBufferInputStream();
+        in.addBuffer(remaining);
+        return in;
+    }
+
+    @Override
+    public void mark(int readlimit) {
+        this.mark = buffer.position();
+    }
+
+    @Override
+    public void reset() throws IOException {
+        if (mark >= 0) {
+            buffer.position(mark);
+            this.mark = -1;
+        } else {
+            throw new IOException("No mark defined");
+        }
+    }
+
+    @Override
+    public int available() {
+        return buffer.remaining();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiByteBufferInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiByteBufferInputStream.java
new file mode 100644
index 0000000000..31f81793dc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/MultiByteBufferInputStream.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+
+public final class MultiByteBufferInputStream extends AbstractBytesInputStream {
+    private static final ByteBuffer EMPTY;
+
+    static {
+        EMPTY = ByteBuffer.allocate(0);
+        EMPTY.limit(0);
+    }
+
+    private final Queue<ByteBuffer> buffers;
+    private final ArrayBackedValueStorage tempPointableStorage;
+    private int length;
+
+    private ByteBuffer current;
+    private int position;
+
+    public MultiByteBufferInputStream() {
+        this.buffers = new ArrayDeque<>();
+        tempPointableStorage = new ArrayBackedValueStorage();
+        this.current = EMPTY;
+        this.position = 0;
+        this.length = 0;
+
+    }
+
+    private MultiByteBufferInputStream(MultiByteBufferInputStream original, int len) throws EOFException {
+        buffers = new ArrayDeque<>();
+        tempPointableStorage = new ArrayBackedValueStorage();
+        position = original.position;
+        length = original.length;
+        buffers.addAll(original.sliceBuffers(len));
+        nextBuffer();
+    }
+
+    @Override
+    public void reset(IColumnBufferProvider bufferProvider) throws HyracksDataException {
+        reset();
+        length = bufferProvider.getLength();
+        if (length > 0) {
+            bufferProvider.readAll(buffers);
+            nextBuffer();
+        }
+    }
+
+    @Override
+    protected void addBuffer(ByteBuffer buffer) {
+        buffers.add(buffer);
+        length += buffer.remaining();
+    }
+
+    @Override
+    public void resetAt(int bytesToSkip, AbstractBytesInputStream stream) throws IOException {
+        MultiByteBufferInputStream original = (MultiByteBufferInputStream) stream;
+        buffers.clear();
+        position = original.position;
+        length = original.length;
+        current = original.current.duplicate();
+        for (ByteBuffer buffer : original.buffers) {
+            buffers.add(buffer.duplicate());
+        }
+
+        if (skip(bytesToSkip) != bytesToSkip) {
+            throw new EOFException();
+        }
+    }
+
+    @Override
+    public long skip(long n) {
+        if (n <= 0) {
+            return 0;
+        }
+
+        if (current == null) {
+            return -1;
+        }
+
+        long bytesSkipped = 0;
+        while (bytesSkipped < n) {
+            if (current.remaining() > 0) {
+                long bytesToSkip = Math.min(n - bytesSkipped, current.remaining());
+                current.position(current.position() + (int) bytesToSkip);
+                bytesSkipped += bytesToSkip;
+                this.position += bytesToSkip;
+            } else if (!nextBuffer()) {
+                // there are no more buffers
+                return bytesSkipped > 0 ? bytesSkipped : -1;
+            }
+        }
+
+        return bytesSkipped;
+    }
+
+    @Override
+    public int read(ByteBuffer out) {
+        int len = out.remaining();
+        if (len <= 0) {
+            return 0;
+        }
+
+        if (current == null) {
+            return -1;
+        }
+
+        int bytesCopied = 0;
+        while (bytesCopied < len) {
+            if (current.remaining() > 0) {
+                int bytesToCopy;
+                ByteBuffer copyBuffer;
+                if (current.remaining() <= out.remaining()) {
+                    // copy all the current buffer
+                    bytesToCopy = current.remaining();
+                    copyBuffer = current;
+                } else {
+                    // copy a slice of the current buffer
+                    bytesToCopy = out.remaining();
+                    copyBuffer = current.duplicate();
+                    copyBuffer.limit(copyBuffer.position() + bytesToCopy);
+                    current.position(copyBuffer.position() + bytesToCopy);
+                }
+
+                out.put(copyBuffer);
+                bytesCopied += bytesToCopy;
+                this.position += bytesToCopy;
+
+            } else if (!nextBuffer()) {
+                // there are no more buffers
+                return bytesCopied > 0 ? bytesCopied : -1;
+            }
+        }
+
+        return bytesCopied;
+    }
+
+    @Override
+    public AbstractBytesInputStream sliceStream(int length) throws EOFException {
+        return new MultiByteBufferInputStream(this, length);
+    }
+
+    @Override
+    public AbstractBytesInputStream remainingStream() throws EOFException {
+        return new MultiByteBufferInputStream(this, length - position);
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len) {
+        if (len <= 0) {
+            if (len < 0) {
+                throw new IndexOutOfBoundsException("Read length must be greater than 0: " + len);
+            }
+            return 0;
+        }
+
+        if (current == null) {
+            return -1;
+        }
+
+        int bytesRead = 0;
+        while (bytesRead < len) {
+            if (current.remaining() > 0) {
+                int bytesToRead = Math.min(len - bytesRead, current.remaining());
+                current.get(bytes, off + bytesRead, bytesToRead);
+                bytesRead += bytesToRead;
+                this.position += bytesToRead;
+            } else if (!nextBuffer()) {
+                // there are no more buffers
+                return bytesRead > 0 ? bytesRead : -1;
+            }
+        }
+
+        return bytesRead;
+    }
+
+    @Override
+    public int read(byte[] bytes) {
+        return read(bytes, 0, bytes.length);
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (current == null) {
+            throw new EOFException();
+        }
+
+        while (true) {
+            if (current.remaining() > 0) {
+                this.position += 1;
+                return current.get() & 0xFF; // as unsigned
+            } else if (!nextBuffer()) {
+                // there are no more buffers
+                throw new EOFException();
+            }
+        }
+    }
+
+    @Override
+    public void read(IPointable pointable, int length) throws EOFException {
+        if (current.remaining() >= length) {
+            pointable.set(current.array(), current.position(), length);
+            current.position(current.position() + length);
+            position += length;
+        } else {
+            tempPointableStorage.setSize(length);
+            //Read first half part from the current buffer
+            int bytesRead = read(tempPointableStorage.getByteArray(), 0, length);
+            if (bytesRead != length) {
+                throw new EOFException();
+            }
+            pointable.set(tempPointableStorage);
+        }
+    }
+
+    @Override
+    public int available() {
+        return length - position;
+    }
+
+    @Override
+    public void mark(int readLimit) {
+        throw new UnsupportedOperationException("reset() is not supported");
+    }
+
+    @Override
+    public void reset() {
+        buffers.clear();
+        this.current = EMPTY;
+        this.position = 0;
+        this.length = 0;
+    }
+
+    private List<ByteBuffer> sliceBuffers(long length) throws EOFException {
+        if (length <= 0) {
+            return Collections.emptyList();
+        }
+
+        if (current == null) {
+            throw new EOFException();
+        }
+
+        List<ByteBuffer> sliceBuffers = new ArrayList<>();
+        long bytesAccumulated = 0;
+        while (bytesAccumulated < length) {
+            if (current.remaining() > 0) {
+                // get a slice of the current buffer to return
+                // always fits in an int because remaining returns an int that is >= 0
+                int bufLen = (int) Math.min(length - bytesAccumulated, current.remaining());
+                ByteBuffer slice = current.duplicate();
+                slice.limit(slice.position() + bufLen);
+                sliceBuffers.add(slice);
+                bytesAccumulated += bufLen;
+
+                // update state; the bytes are considered read
+                current.position(current.position() + bufLen);
+                this.position += bufLen;
+            } else if (!nextBuffer()) {
+                // there are no more buffers
+                throw new EOFException();
+            }
+        }
+
+        return sliceBuffers;
+    }
+
+    private boolean nextBuffer() {
+        if (buffers.isEmpty()) {
+            return false;
+        }
+        current = buffers.poll();
+        return true;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractBytesOutputStream.java
new file mode 100644
index 0000000000..698eac41d3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractBytesOutputStream.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.values.ValuesWriter;
+
+/**
+ * Extends {@link OutputStream} to include methods needed by {@link ValuesWriter}
+ */
+public abstract class AbstractBytesOutputStream extends OutputStream {
+    private final ParquetBytesInput bytesInput;
+
+    protected AbstractBytesOutputStream() {
+        bytesInput = new ParquetBytesInput(this);
+    }
+
+    @Override
+    public abstract void write(int b) throws IOException;
+
+    @Override
+    public final void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public abstract void write(byte[] b, int off, int len) throws IOException;
+
+    public final void write(IValueReference value) throws IOException {
+        write(value.getByteArray(), value.getStartOffset(), value.getLength());
+    }
+
+    public final BytesInput asBytesInput() {
+        return bytesInput;
+    }
+
+    public abstract void finish();
+
+    /**
+     * Reset output stream
+     */
+    public abstract void reset() throws HyracksDataException;
+
+    /**
+     * Reserve a byte at the current position of the stream
+     *
+     * @param pointer pointer that references the current position
+     */
+    public abstract void reserveByte(IReservedPointer pointer) throws IOException;
+
+    /**
+     * Reserve an integer at the current position of the stream
+     *
+     * @param pointer pointer that references the current position
+     */
+    public abstract void reserveInteger(IReservedPointer pointer) throws IOException;
+
+    /**
+     * @return a reusable instance of {@link IReservedPointer}
+     */
+    public abstract IReservedPointer createPointer();
+
+    /**
+     * @return Size of written value
+     */
+    public abstract int size();
+
+    /**
+     * @return Allocated buffer size
+     */
+    public abstract int capacity();
+
+    /**
+     * Write the content to another output stream
+     *
+     * @param outputStream output stream to write to
+     */
+    public abstract void writeTo(OutputStream outputStream) throws IOException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
new file mode 100644
index 0000000000..4b7c835641
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/AbstractMultiBufferBytesOutputStream.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.column.bytes.stream.out.pointer.ByteBufferReservedPointer;
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+abstract class AbstractMultiBufferBytesOutputStream extends AbstractBytesOutputStream {
+    protected final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+    protected final List<ByteBuffer> buffers;
+    protected int currentBufferIndex;
+    protected int allocatedBytes;
+    protected int position;
+    protected ByteBuffer currentBuf;
+
+    AbstractMultiBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        this.multiPageOpRef = multiPageOpRef;
+        buffers = new ArrayList<>();
+    }
+
+    protected abstract ByteBuffer confiscateNewBuffer() throws HyracksDataException;
+
+    protected abstract void preReset() throws HyracksDataException;
+
+    @Override
+    public final void reset() throws HyracksDataException {
+        preReset();
+        position = 0;
+        currentBufferIndex = 0;
+        if (allocatedBytes == 0) {
+            allocateBuffer();
+        }
+        currentBufferIndex = 0;
+        currentBuf = buffers.get(0);
+        currentBuf.clear();
+    }
+
+    @Override
+    public final void write(int b) throws IOException {
+        ensureCapacity(1);
+        currentBuf.put((byte) b);
+        position++;
+    }
+
+    @Override
+    public final void write(byte[] b, int off, int len) throws IOException {
+        ensureCapacity(len);
+        int remaining = len;
+        int offset = off;
+        while (remaining > 0) {
+            setNextBufferIfNeeded();
+            int writeLength = Math.min(remaining, currentBuf.remaining());
+            currentBuf.put(b, offset, writeLength);
+            position += writeLength;
+            offset += writeLength;
+            remaining -= writeLength;
+        }
+    }
+
+    @Override
+    public void reserveByte(IReservedPointer pointer) throws IOException {
+        ensureCapacity(Byte.BYTES);
+        int offset = getCurrentBufferPosition();
+        currentBuf.put((byte) 0);
+        position += 1;
+        ((ByteBufferReservedPointer) pointer).setPointer(currentBuf, offset);
+    }
+
+    @Override
+    public final void reserveInteger(IReservedPointer pointer) throws HyracksDataException {
+        ensureCapacity(Integer.BYTES);
+        int offset = getCurrentBufferPosition();
+        currentBuf.putInt(0);
+        position += Integer.BYTES;
+        ((ByteBufferReservedPointer) pointer).setPointer(currentBuf, offset);
+    }
+
+    @Override
+    public final IReservedPointer createPointer() {
+        return new ByteBufferReservedPointer();
+    }
+
+    public final int getCurrentBufferPosition() {
+        return currentBuf.position();
+    }
+
+    @Override
+    public final int size() {
+        return position;
+    }
+
+    @Override
+    public final int capacity() {
+        return allocatedBytes;
+    }
+
+    @Override
+    public final void finish() {
+        currentBuf = null;
+        buffers.clear();
+        allocatedBytes = 0;
+    }
+
+    /* *************************************************
+     * Helper methods
+     * *************************************************
+     */
+
+    private void ensureCapacity(int length) throws HyracksDataException {
+        if (position + length > allocatedBytes) {
+            allocateMoreBuffers(length);
+        } else if (length > 0) {
+            setNextBufferIfNeeded();
+        }
+    }
+
+    private void allocateMoreBuffers(int length) throws HyracksDataException {
+        int neededSpace = length - currentBuf.remaining();
+        while (neededSpace > 0) {
+            neededSpace -= allocateBuffer();
+        }
+        setNextBufferIfNeeded();
+    }
+
+    private void setNextBufferIfNeeded() {
+        if (currentBuf.remaining() == 0) {
+            currentBuf = buffers.get(++currentBufferIndex);
+            currentBuf.clear();
+        }
+    }
+
+    private int allocateBuffer() throws HyracksDataException {
+        ByteBuffer buffer = confiscateNewBuffer();
+        buffers.add(buffer);
+        buffer.clear();
+        int size = buffer.capacity();
+        allocatedBytes += size;
+        return size;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java
new file mode 100644
index 0000000000..8817ae64a5
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ByteBufferOutputStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public final class ByteBufferOutputStream extends OutputStream {
+    private ByteBuffer buffer;
+    private int startOffset;
+
+    public void reset(ByteBuffer buffer) {
+        this.buffer = buffer;
+        startOffset = buffer.position();
+    }
+
+    public int size() {
+        return buffer.position() - startOffset;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        buffer.put((byte) b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        buffer.put(b, off, len);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/GrowableBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/GrowableBytesOutputStream.java
new file mode 100644
index 0000000000..20daf7da7a
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/GrowableBytesOutputStream.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.bytes.stream.out.pointer.GrowableBytesPointer;
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public final class GrowableBytesOutputStream extends AbstractBytesOutputStream {
+    private final ArrayBackedValueStorage storage;
+
+    public GrowableBytesOutputStream() {
+        storage = new ArrayBackedValueStorage(128);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        storage.getDataOutput().write(b);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        storage.getDataOutput().write(b, off, len);
+    }
+
+    @Override
+    public void finish() {
+        reset();
+    }
+
+    @Override
+    public void reset() {
+        storage.reset();
+    }
+
+    @Override
+    public void reserveByte(IReservedPointer pointer) throws IOException {
+        ((GrowableBytesPointer) pointer).setPointer(storage.getLength());
+        storage.getDataOutput().write(0);
+    }
+
+    @Override
+    public void reserveInteger(IReservedPointer pointer) throws IOException {
+        ((GrowableBytesPointer) pointer).setPointer(storage.getLength());
+        storage.getDataOutput().writeInt(0);
+    }
+
+    @Override
+    public IReservedPointer createPointer() {
+        return new GrowableBytesPointer(storage);
+    }
+
+    @Override
+    public int size() {
+        return storage.getLength();
+    }
+
+    @Override
+    public int capacity() {
+        return storage.getByteArray().length;
+    }
+
+    @Override
+    public void writeTo(OutputStream outputStream) throws IOException {
+        outputStream.write(storage.getByteArray(), storage.getStartOffset(), storage.getLength());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentBufferBytesOutputStream.java
new file mode 100644
index 0000000000..c910131a6c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiPersistentBufferBytesOutputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public final class MultiPersistentBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+    public MultiPersistentBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        super(multiPageOpRef);
+    }
+
+    @Override
+    protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+        return multiPageOpRef.getValue().confiscatePersistent();
+    }
+
+    @Override
+    protected void preReset() throws HyracksDataException {
+        if (allocatedBytes > 0) {
+            //Persist all buffers before resetting the stream
+            multiPageOpRef.getValue().persist();
+            allocatedBytes = 0;
+            buffers.clear();
+        }
+    }
+
+    @Override
+    public void writeTo(OutputStream outputStream) {
+        throw new IllegalAccessError("Persistent stream cannot be written to other stream");
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
new file mode 100644
index 0000000000..cf2808e0e2
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/MultiTemporaryBufferBytesOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public final class MultiTemporaryBufferBytesOutputStream extends AbstractMultiBufferBytesOutputStream {
+    public MultiTemporaryBufferBytesOutputStream(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        super(multiPageOpRef);
+    }
+
+    @Override
+    protected void preReset() {
+        //NoOp
+    }
+
+    @Override
+    protected ByteBuffer confiscateNewBuffer() throws HyracksDataException {
+        return multiPageOpRef.getValue().confiscateTemporary();
+    }
+
+    @Override
+    public void writeTo(OutputStream outputStream) throws IOException {
+        int writtenSize = 0;
+        for (int i = 0; i < currentBufferIndex + 1; i++) {
+            ByteBuffer buffer = buffers.get(i);
+            outputStream.write(buffer.array(), 0, buffer.position());
+            writtenSize += buffer.position();
+        }
+        if (writtenSize != position) {
+            //Sanity check
+            throw new IllegalStateException("Size is different");
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ParquetBytesInput.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ParquetBytesInput.java
new file mode 100644
index 0000000000..c5ad38ea90
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ParquetBytesInput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.bytes.encoder.ParquetDeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.bytes.BytesInput;
+
+/**
+ * A wrapper for {@link BytesInput} which is used to concatenate multiple {@link AbstractBytesOutputStream}
+ *
+ * @see ParquetDeltaBinaryPackingValuesWriterForLong#getBytes() as an example
+ */
+class ParquetBytesInput extends BytesInput {
+    private final AbstractBytesOutputStream outputStream;
+
+    ParquetBytesInput(AbstractBytesOutputStream outputStream) {
+        this.outputStream = outputStream;
+    }
+
+    @Override
+    public final void writeAllTo(OutputStream outputStream) throws IOException {
+        this.outputStream.writeTo(outputStream);
+    }
+
+    @Override
+    public final long size() {
+        return outputStream.size();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/ByteBufferReservedPointer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/ByteBufferReservedPointer.java
new file mode 100644
index 0000000000..8773a310d7
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/ByteBufferReservedPointer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out.pointer;
+
+import java.nio.ByteBuffer;
+
+public class ByteBufferReservedPointer implements IReservedPointer {
+    private ByteBuffer buffer;
+    private int offset;
+
+    public void setPointer(ByteBuffer buffer, int offset) {
+        this.buffer = buffer;
+        this.offset = offset;
+    }
+
+    @Override
+    public void setByte(byte value) {
+        buffer.put(offset, value);
+    }
+
+    @Override
+    public void setInteger(int value) {
+        buffer.putInt(offset, value);
+    }
+
+    @Override
+    public void reset() {
+        buffer = null;
+    }
+
+    @Override
+    public boolean isSet() {
+        return buffer != null;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/GrowableBytesPointer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/GrowableBytesPointer.java
new file mode 100644
index 0000000000..0863c72e7f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/GrowableBytesPointer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out.pointer;
+
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class GrowableBytesPointer implements IReservedPointer {
+    private final ArrayBackedValueStorage storage;
+    private int offset;
+
+    public GrowableBytesPointer(ArrayBackedValueStorage storage) {
+        this.storage = storage;
+    }
+
+    public void setPointer(int offset) {
+        this.offset = offset;
+    }
+
+    @Override
+    public void setByte(byte value) {
+        storage.getByteArray()[offset] = value;
+    }
+
+    @Override
+    public void setInteger(int value) {
+        IntegerPointable.setInteger(storage.getByteArray(), offset, value);
+    }
+
+    @Override
+    public void reset() {
+        offset = -1;
+    }
+
+    @Override
+    public boolean isSet() {
+        return offset >= 0;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/IReservedPointer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/IReservedPointer.java
new file mode 100644
index 0000000000..46c4d3632c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/pointer/IReservedPointer.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out.pointer;
+
+import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
+
+/**
+ * Pointer that reference a position in {@link AbstractBytesOutputStream}
+ */
+public interface IReservedPointer {
+    /**
+     * Set a byte value at the pointer's position
+     *
+     * @param value byte value to be set
+     */
+    void setByte(byte value);
+
+    /**
+     * Set an integer value at the pointer's position
+     *
+     * @param value integer value to be set
+     */
+    void setInteger(int value);
+
+    /**
+     * Reset the pointer
+     */
+    void reset();
+
+    /**
+     * @return whether the pointer is set or not
+     */
+    boolean isSet();
+}
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index bff631f7ff..85ee76d6fc 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -942,6 +942,7 @@
     <module>asterix-license</module>
     <module>asterix-geo</module>
     <module>asterix-spidersilk</module>
+    <module>asterix-column</module>
   </modules>
 
   <dependencyManagement>
@@ -1382,6 +1383,11 @@
         <artifactId>hyracks-storage-am-lsm-btree</artifactId>
         <version>${hyracks.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.hyracks</groupId>
+        <artifactId>hyracks-storage-am-lsm-btree-column</artifactId>
+        <version>${hyracks.version}</version>
+      </dependency>
       <dependency>
         <groupId>org.apache.hyracks</groupId>
         <artifactId>hyracks-storage-am-lsm-rtree</artifactId>
@@ -1896,6 +1902,28 @@
           </exclusion>
         </exclusions>
       </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-common</artifactId>
+        <version>${parquet.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-encoding</artifactId>
+        <version>${parquet.version}</version>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
       <dependency>
         <groupId>org.kitesdk</groupId>
         <artifactId>kite-data-core</artifactId>
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
index d5a4481320..d4feff60a6 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/ArrayBackedValueStorage.java
@@ -65,8 +65,12 @@ public class ArrayBackedValueStorage implements IMutableValueStorage, IPointable
     }
 
     public void append(IValueReference value) throws HyracksDataException {
+        append(value.getByteArray(), value.getStartOffset(), value.getLength());
+    }
+
+    public void append(byte[] bytes, int start, int length) throws HyracksDataException {
         try {
-            data.append(value);
+            data.append(bytes, start, length);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
index 9e4a2974ab..24682a8098 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/pom.xml
@@ -87,5 +87,9 @@
       <artifactId>hyracks-util</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
index 2ffa1bbeb7..0c955002fa 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
@@ -75,4 +75,6 @@ public interface IColumnTupleIterator extends ILSMTreeTupleReference, Comparable
      * Calls {@link IBufferCache#unpin(ICachedPage)} for all columns' pages
      */
     void unpinColumnsPages() throws HyracksDataException;
+
+    void close();
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index d0b7e2b482..fe980cc5e1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -194,6 +194,7 @@ public class ColumnBTreeRangeSearchCursor extends EnforcedIndexCursor
 
     @Override
     public void doClose() throws HyracksDataException {
+        frameTuple.close();
         releasePages();
         page0 = null;
         pred = null;
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index 5a3b111a82..d39f94e97d 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -29,14 +29,19 @@ import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public abstract class AbstractColumnTupleReference implements IColumnTupleIterator {
+    private static final Logger LOGGER = LogManager.getLogger();
     private static final String UNSUPPORTED_OPERATION_MSG = "Operation is not supported for column tuples";
     private final int componentIndex;
     private final ColumnBTreeReadLeafFrame frame;
     private final IColumnBufferProvider[] primaryKeyBufferProviders;
     private final IColumnBufferProvider[] buffersProviders;
     private final int numberOfPrimaryKeys;
+    private int totalNumberOfPages;
+    private int numOfSkippedPages;
     protected int tupleIndex;
 
     /**
@@ -68,6 +73,8 @@ public abstract class AbstractColumnTupleReference implements IColumnTupleIterat
                 buffersProviders[i] = new ColumnSingleBufferProvider(columnIndex);
             }
         }
+        totalNumberOfPages = 0;
+        numOfSkippedPages = 0;
     }
 
     @Override
@@ -96,7 +103,10 @@ public abstract class AbstractColumnTupleReference implements IColumnTupleIterat
                 provider.reset(frame);
                 startColumn(provider, tupleIndex, i, numberOfTuples);
             }
+        } else {
+            numOfSkippedPages++;
         }
+        totalNumberOfPages++;
     }
 
     protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples);
@@ -137,6 +147,13 @@ public abstract class AbstractColumnTupleReference implements IColumnTupleIterat
         }
     }
 
+    @Override
+    public final void close() {
+        if (LOGGER.isInfoEnabled()) {
+            LOGGER.info("Skipped {} pages out of {} in total", numOfSkippedPages, totalNumberOfPages);
+        }
+    }
+
     /* *************************************************************
      * Unsupported Operations
      * *************************************************************
diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
index c0475b1afc..cde79cb95a 100644
--- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
+++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/string/UTF8StringUtil.java
@@ -221,6 +221,10 @@ public class UTF8StringUtil {
         return VarLenIntEncoderDecoder.decode(b, s);
     }
 
+    public static int getNumBytesToStoreLength(byte[] bytes, int start) {
+        return getNumBytesToStoreLength(getUTFLength(bytes, start));
+    }
+
     public static int getNumBytesToStoreLength(int strlen) {
         return VarLenIntEncoderDecoder.getBytesRequired(strlen);
     }