You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by wy...@apache.org on 2023/03/09 22:58:21 UTC

[asterixdb] branch master updated: [ASTERIXDB-3130][STO] Introduce column readers/writers

This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 15a4af570b [ASTERIXDB-3130][STO] Introduce column readers/writers
15a4af570b is described below

commit 15a4af570bcd8c8aaf72f9002a00bd74c4d584e1
Author: Wail Alkowaileet <wa...@gmail.com>
AuthorDate: Thu Mar 9 10:32:29 2023 -0800

    [ASTERIXDB-3130][STO] Introduce column readers/writers
    
    - user mode changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Add columns readers and writers
    
    Change-Id: I501f631beb1fb8347841bcd80112b04fabdf1df2
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17415
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wail Alkowaileet <wa...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../asterix/column/util/ColumnValuesUtil.java      |  48 ++++
 .../asterix/column/util/RunLengthIntArray.java     | 179 +++++++++++++++
 .../asterix/column/values/IColumnBatchWriter.java  |  44 ++++
 .../asterix/column/values/IColumnValuesReader.java | 133 +++++++++++
 .../column/values/IColumnValuesReaderFactory.java  |  56 +++++
 .../asterix/column/values/IColumnValuesWriter.java | 138 ++++++++++++
 .../column/values/IColumnValuesWriterFactory.java  |  36 +++
 .../values/reader/AbstractColumnValuesReader.java  | 169 ++++++++++++++
 .../values/reader/ColumnValueReaderFactory.java    |  82 +++++++
 .../values/reader/PrimitiveColumnValuesReader.java |  95 ++++++++
 .../RepeatedPrimitiveColumnValuesReader.java       | 132 +++++++++++
 .../values/reader/value/AbstractValueReader.java   |  51 +++++
 .../values/reader/value/BooleanValueReader.java    |  65 ++++++
 .../values/reader/value/DoubleValueReader.java     |  59 +++++
 .../values/reader/value/LongValueReader.java       |  59 +++++
 .../values/reader/value/NoOpValueReader.java       |  52 +++++
 .../values/reader/value/StringValueReader.java     |  61 ++++++
 .../values/reader/value/UUIDValueReader.java       |  61 ++++++
 .../values/writer/AbstractColumnValuesWriter.java  | 244 +++++++++++++++++++++
 .../values/writer/BooleanColumnValuesWriter.java   |  86 ++++++++
 .../column/values/writer/ColumnBatchWriter.java    | 154 +++++++++++++
 .../values/writer/ColumnValuesWriterFactory.java   |  55 +++++
 .../values/writer/DoubleColumnValuesWriter.java    | 115 ++++++++++
 .../values/writer/LongColumnValuesWriter.java      | 109 +++++++++
 .../writer/NullMissingColumnValuesWriter.java      | 102 +++++++++
 .../values/writer/StringColumnValuesWriter.java    |  97 ++++++++
 .../values/writer/UUIDColumnValuesWriter.java      |  43 ++++
 .../writer/filters/AbstractColumnFilterWriter.java |  45 ++++
 .../writer/filters/DoubleColumnFilterWriter.java   |  50 +++++
 .../writer/filters/LongColumnFilterWriter.java     |  50 +++++
 .../writer/filters/NoOpColumnFilterWriter.java     |  58 +++++
 .../writer/filters/StringColumnFilterWriter.java   |  56 +++++
 .../writer/filters/UUIDColumnFilterWriter.java     |  41 ++++
 .../AUUIDPartialBinaryComparatorFactory.java       |   6 +
 .../data/std/primitive/UTF8StringPointable.java    | 136 +++++-------
 35 files changed, 2884 insertions(+), 83 deletions(-)

diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
new file mode 100644
index 0000000000..0ecdeefb91
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/ColumnValuesUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.util;
+
+public class ColumnValuesUtil {
+    private ColumnValuesUtil() {
+    }
+
+    public static int getBitWidth(int level) {
+        //+1 for the null bit
+        return (32 - Integer.numberOfLeadingZeros(level)) + 1;
+    }
+
+    public static int getNullMask(int level) {
+        return 1 << getBitWidth(level) - 1;
+    }
+
+    public static boolean isNull(int mask, int level) {
+        return (mask & level) == mask;
+    }
+
+    public static int getChildValue(int parentMask, int childMask, int level) {
+        if (isNull(parentMask, level)) {
+            return clearNullBit(parentMask, level) | childMask;
+        }
+        return level;
+    }
+
+    public static int clearNullBit(int nullBitMask, int level) {
+        return (nullBitMask - 1) & level;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/RunLengthIntArray.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/RunLengthIntArray.java
new file mode 100644
index 0000000000..df238cb5cc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/util/RunLengthIntArray.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.util;
+
+import java.util.Arrays;
+
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+
+/**
+ * Run-length integer array is to be used for storing repetitive integer values. This is intended for
+ * storing a large number of repeated integers (~1000s). It is not recommended for storing smaller number of integers.
+ * This structure maintains two arrays:
+ * - blockValues: stores the array values
+ * - blockCounts: stores the counts of values in <code>blockValues</code> in a monotonic fashion
+ * <pr>
+ * Example:
+ * Original Array: [1,1,1,1,1,1,2,2,2,1,1,1]
+ * blockValues: [1,2,1]
+ * blockCounts: [6,10,13]
+ */
+public final class RunLengthIntArray {
+    private final IntArrayList blockValues;
+    private int[] blockCounts;
+    private int lastSeen;
+    private int size;
+
+    public RunLengthIntArray() {
+        blockValues = new IntArrayList();
+        blockCounts = new int[32];
+        reset();
+    }
+
+    public void reset() {
+        blockValues.clear();
+        lastSeen = -1;
+        size = 0;
+    }
+
+    public void add(int value) {
+        if (size == 0 || value != lastSeen) {
+            lastSeen = value;
+            newBlock();
+            blockValues.add(value);
+        }
+        blockCounts[blockValues.size() - 1]++;
+        size++;
+    }
+
+    public void add(int value, int count) {
+        if (count == 0) {
+            return;
+        }
+        if (size == 0 || value != lastSeen) {
+            lastSeen = value;
+            newBlock();
+            blockValues.add(value);
+        }
+        blockCounts[blockValues.size() - 1] += count;
+        size += count;
+    }
+
+    public int getSize() {
+        return size;
+    }
+
+    public int getNumberOfBlocks() {
+        return blockValues.size();
+    }
+
+    public int getBlockValue(int blockIndex) {
+        return blockValues.getInt(blockIndex);
+    }
+
+    public int getBlockSize(int blockIndex) {
+        if (blockIndex == 0) {
+            return blockCounts[blockIndex];
+        }
+        return blockCounts[blockIndex] - blockCounts[blockIndex - 1];
+    }
+
+    public int getBlockSize(int blockIndex, int startIndex) {
+        return blockCounts[blockIndex] - startIndex;
+    }
+
+    public int getBlockIndex(int startIndex) {
+        if (startIndex >= size) {
+            throw new IndexOutOfBoundsException("startIndex: " + startIndex + " >= size:" + size);
+        }
+        int index = Arrays.binarySearch(blockCounts, 0, blockValues.size(), startIndex);
+        if (index < 0) {
+            index = Math.abs(index) - 1;
+        }
+        return index;
+    }
+
+    public void add(RunLengthIntArray other, int startIndex) {
+        if (startIndex >= other.size) {
+            throw new IndexOutOfBoundsException("startIndex: " + startIndex + " >= other size:" + size);
+        }
+        //First, handle the first block as startIndex might be at the middle of a block
+        //Get which block that startIndex resides
+        int otherBlockIndex = other.getBlockIndex(startIndex);
+        //Get the remaining of the first block starting from startIndex
+        int otherBlockSizeRemaining = other.getBlockSize(otherBlockIndex, startIndex);
+        //Batch add all the remaining values
+        add(other.getBlockValue(otherBlockIndex), otherBlockSizeRemaining);
+
+        //Add other blocks as batches
+        for (int i = otherBlockIndex + 1; i < other.getNumberOfBlocks(); i++) {
+            add(other.getBlockValue(i), other.getBlockSize(i));
+        }
+    }
+
+    private void newBlock() {
+        int newBlockIndex = blockValues.size();
+        if (newBlockIndex == blockCounts.length) {
+            int[] newRepCount = new int[blockCounts.length * 2];
+            System.arraycopy(blockCounts, 0, newRepCount, 0, blockCounts.length);
+            blockCounts = newRepCount;
+        }
+        if (newBlockIndex > 0) {
+            /*
+             * To easily compute where the actual block resides, the block counts are always increasing.
+             * For example:
+             * - Let blockCounts = [5, 6, 13] and blockValues = [1, 0, 1]
+             * - The block sizes are 5, 1, and 7 respectively
+             * - Let say that we want to know what is the value at index 11 by calling getValue(11)
+             * - by searching blockCounts, we know it is at the block with index 2
+             * - Then the value is 1
+             */
+            blockCounts[newBlockIndex] = blockCounts[newBlockIndex - 1];
+        } else {
+            blockCounts[0] = 0;
+        }
+    }
+
+    @Override
+    public String toString() {
+        if (size == 0) {
+            return "[]";
+        }
+        StringBuilder builder = new StringBuilder();
+        int i = 0;
+        builder.append("size: ");
+        builder.append(getSize());
+        builder.append(" [");
+        for (; i < getNumberOfBlocks() - 1; i++) {
+            appendBlockInfo(i, builder);
+            builder.append(',');
+        }
+        appendBlockInfo(i, builder);
+        builder.append(']');
+        return builder.toString();
+    }
+
+    private void appendBlockInfo(int blockIndex, StringBuilder builder) {
+        builder.append('(');
+        builder.append(getBlockValue(blockIndex));
+        builder.append(',');
+        builder.append(getBlockSize(blockIndex));
+        builder.append(')');
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
new file mode 100644
index 0000000000..fc1173fcec
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnBatchWriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IColumnBatchWriter {
+    void setPageZeroBuffer(ByteBuffer pageZeroBuffer, int numberOfColumns, int numberOfPrimaryKeys);
+
+    /**
+     * Writes the primary keys' values to Page0
+     *
+     * @param primaryKeyWriters primary keys' writers
+     * @return the allocated space for the primary keys' writers
+     */
+    int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException;
+
+    /**
+     * Writes the non-key values to multiple pages
+     *
+     * @param nonKeysColumnWriters non-key values' writers
+     * @return the allocated space for the non-key values' writers
+     */
+    int writeColumns(PriorityQueue<IColumnValuesWriter> nonKeysColumnWriters) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
new file mode 100644
index 0000000000..0f4cc0c620
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public interface IColumnValuesReader extends Comparable<IColumnValuesReader> {
+    /**
+     * Reset the reader
+     *
+     * @param in         input stream that contains the values
+     * @param tupleCount tuple count this column batch belongs to
+     */
+    void reset(AbstractBytesInputStream in, int tupleCount) throws HyracksDataException;
+
+    /* ***********************
+     * Iteration functions
+     * ***********************
+     */
+
+    /**
+     * Move the next value
+     *
+     * @return true if next value was found, false if the end of the values
+     */
+    boolean next() throws HyracksDataException;
+
+    /* ***********************
+     * Information functions
+     * ***********************
+     */
+    ATypeTag getTypeTag();
+
+    /**
+     * @return columnIndex
+     */
+    int getColumnIndex();
+
+    /**
+     * @return Level of the value, which determines if it is NULL, MISSING, or VALUE
+     */
+    int getLevel();
+
+    /**
+     * @return is the current value MISSING
+     */
+    boolean isMissing();
+
+    /**
+     * @return is the current value NULL
+     */
+    boolean isNull();
+
+    /**
+     * @return is an actual value (i.e., neither NULL or MISSING)
+     */
+    boolean isValue();
+
+    /**
+     * @return is this column belongs to an array or multiset
+     */
+    boolean isRepeated();
+
+    /**
+     * @return is it an end of an array (arrays could be nested, and we can hit different delimiters)
+     */
+    boolean isDelimiter();
+
+    /**
+     * @return which delimiter was returned (nested arrays have different delimiter indexes)
+     */
+    int getDelimiterIndex();
+
+    /* ***********************
+     * Value functions
+     * ***********************
+     */
+
+    long getLong();
+
+    double getDouble();
+
+    boolean getBoolean();
+
+    IValueReference getBytes();
+
+    /* ***********************
+     * Write function
+     * ***********************
+     */
+
+    /**
+     * Write the content of reader to the writer
+     *
+     * @param writer   to which is the content of this reader is written to
+     * @param callNext should call next on write
+     */
+    void write(IColumnValuesWriter writer, boolean callNext) throws HyracksDataException;
+
+    /**
+     * Write the content of reader to the writer
+     *
+     * @param writer to which is the content of this reader is written to
+     * @param count  number of values to write
+     */
+    void write(IColumnValuesWriter writer, int count) throws HyracksDataException;
+
+    /**
+     * Skip values
+     *
+     * @param count the number of values should be skipped
+     */
+    void skip(int count) throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReaderFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReaderFactory.java
new file mode 100644
index 0000000000..98837ac04a
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReaderFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.asterix.om.types.ATypeTag;
+
+public interface IColumnValuesReaderFactory {
+    /**
+     * Create reader for a non-repeated primitive type
+     *
+     * @param typeTag     primitive type tag
+     * @param columnIndex column index
+     * @param maxLevel    maximum definition levels
+     * @param primaryKey  is the value belongs to a primary key?
+     * @return columnar reader
+     */
+    IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, boolean primaryKey);
+
+    /**
+     * Create a reader for a repeated primitive type
+     *
+     * @param typeTag     primitive type tag
+     * @param columnIndex column index
+     * @param maxLevel    maximum definition levels
+     * @param delimiters  the definition levels for array delimiters
+     * @return columnar reader
+     */
+    IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, int[] delimiters);
+
+    /**
+     * Create a reader from a serialized path info
+     *
+     * @param input column metadata info
+     * @return columnar reader
+     */
+    IColumnValuesReader createValueReader(DataInput input) throws IOException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
new file mode 100644
index 0000000000..d4e6099d8b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * Column writer for values
+ */
+public interface IColumnValuesWriter {
+
+    /**
+     * Reset the writer
+     */
+    void reset() throws HyracksDataException;
+
+    /**
+     * @return the corresponding index of a column
+     */
+    int getColumnIndex();
+
+    /**
+     * Write a value that are not MISSING or NULL
+     *
+     * @param tag   value type tag
+     * @param value value reference
+     */
+    void writeValue(ATypeTag tag, IValueReference value) throws HyracksDataException;
+
+    /**
+     * Writing an anti-matter primary key value
+     *
+     * @param value value reference
+     */
+    void writeAntiMatter(ATypeTag tag, IValueReference value) throws HyracksDataException;
+
+    /**
+     * Write level
+     *
+     * @param level level of the value
+     */
+    void writeLevel(int level) throws HyracksDataException;
+
+    /**
+     * Convenient way to write a level multiple times
+     *
+     * @param level level of the value
+     * @param count the number of level occurrences
+     */
+    void writeLevels(int level, int count) throws HyracksDataException;
+
+    /**
+     * For all writers except for {@link ATypeTag#NULL} writer, this method will return null
+     *
+     * @return the definition levels if this is a  {@link ATypeTag#NULL} writer, {@code null} otherwise
+     */
+    RunLengthIntArray getDefinitionLevelsIntArray();
+
+    /**
+     * Write NULL
+     *
+     * @param level at what level the NULL occurred
+     */
+    void writeNull(int level) throws HyracksDataException;
+
+    /**
+     * Write a non-unknown value from a reader. Not intended for writing {@link ATypeTag#NULL} or
+     * {@link ATypeTag#MISSING}
+     */
+    void writeValue(IColumnValuesReader reader) throws HyracksDataException;
+
+    /**
+     * @return (probably) an overestimated size of the encoded values
+     */
+    int getEstimatedSize();
+
+    /**
+     * @return the allocated space in bytes
+     */
+    int getAllocatedSpace();
+
+    /**
+     * @return the total count of values
+     */
+    int getCount();
+
+    /**
+     * @return normalized minimum column value
+     */
+    long getNormalizedMinValue();
+
+    /**
+     * @return normalized maximum column value
+     */
+    long getNormalizedMaxValue();
+
+    /**
+     * Flush the columns value to output stream
+     *
+     * @param out output stream
+     */
+    void flush(OutputStream out) throws HyracksDataException;
+
+    /**
+     * Close the writer and release all allocated buffers
+     */
+    void close();
+
+    /**
+     * Serialize the writer
+     *
+     * @param output destination to which the writer should be serialized to
+     */
+    void serialize(DataOutput output) throws IOException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriterFactory.java
new file mode 100644
index 0000000000..c858376d3d
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesWriterFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import org.apache.asterix.om.types.ATypeTag;
+
+public interface IColumnValuesWriterFactory {
+    /**
+     * Create a writer
+     *
+     * @param tag         column type
+     * @param columnIndex column index
+     * @param level       maximum level that determine a value is not null or missing
+     * @param writeAlways should writer always despite the fact all values were missing/null
+     * @param filtered    has a column filter
+     * @return a writer
+     */
+    IColumnValuesWriter createValueWriter(ATypeTag tag, int columnIndex, int level, boolean writeAlways,
+            boolean filtered);
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
new file mode 100644
index 0000000000..c0cf18a6a1
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.decoder.ParquetRunLengthBitPackingHybridDecoder;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream;
+import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
+import org.apache.asterix.column.util.ColumnValuesUtil;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.bytes.BytesUtils;
+
+abstract class AbstractColumnValuesReader implements IColumnValuesReader {
+    protected final AbstractValueReader valueReader;
+    protected final int columnIndex;
+    protected final int maxLevel;
+    protected final ParquetRunLengthBitPackingHybridDecoder definitionLevels;
+    protected final AbstractBytesInputStream valuesStream;
+    protected int level;
+    protected int valueCount;
+    protected int valueIndex;
+
+    private int nullBitMask;
+    private boolean nullLevel;
+    private boolean allMissing;
+
+    AbstractColumnValuesReader(AbstractValueReader valueReader, int columnIndex, int maxLevel, boolean primaryKey) {
+        this.valueReader = valueReader;
+        this.columnIndex = columnIndex;
+        this.maxLevel = maxLevel;
+        definitionLevels = new ParquetRunLengthBitPackingHybridDecoder(ColumnValuesUtil.getBitWidth(maxLevel));
+        valuesStream = primaryKey ? new ByteBufferInputStream() : new MultiByteBufferInputStream();
+    }
+
+    final void nextLevel() throws HyracksDataException {
+        if (allMissing) {
+            return;
+        }
+        try {
+            int actualLevel = definitionLevels.readInt();
+            //Check whether the level is for a null value
+            nullLevel = ColumnValuesUtil.isNull(nullBitMask, actualLevel);
+            //Clear the null bit to allow repeated value readers determine the correct delimiter for null values
+            level = ColumnValuesUtil.clearNullBit(nullBitMask, actualLevel);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    abstract void resetValues();
+
+    @Override
+    public final void reset(AbstractBytesInputStream in, int tupleCount) throws HyracksDataException {
+        valueIndex = 0;
+        if (in.available() == 0) {
+            allMissing = true;
+            level = 0;
+            valueCount = tupleCount;
+            return;
+        }
+        allMissing = false;
+        try {
+            nullBitMask = ColumnValuesUtil.getNullMask(BytesUtils.readZigZagVarInt(in));
+            int defLevelsSize = BytesUtils.readZigZagVarInt(in);
+            valueCount = BytesUtils.readZigZagVarInt(in);
+            definitionLevels.reset(in);
+            valuesStream.resetAt(defLevelsSize, in);
+            int valueLength = BytesUtils.readZigZagVarInt(valuesStream);
+            if (valueLength > 0) {
+                valueReader.resetValue(valuesStream);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        resetValues();
+    }
+
+    @Override
+    public final ATypeTag getTypeTag() {
+        return valueReader.getTypeTag();
+    }
+
+    @Override
+    public final int getColumnIndex() {
+        return columnIndex;
+    }
+
+    @Override
+    public int getLevel() {
+        return level;
+    }
+
+    @Override
+    public final boolean isMissing() {
+        return !isDelimiter() && level < maxLevel;
+    }
+
+    @Override
+    public final boolean isNull() {
+        return nullLevel;
+    }
+
+    @Override
+    public final boolean isValue() {
+        return !isNull() && level == maxLevel;
+    }
+
+    @Override
+    public final long getLong() {
+        return valueReader.getLong();
+    }
+
+    @Override
+    public final double getDouble() {
+        return valueReader.getDouble();
+    }
+
+    @Override
+    public final boolean getBoolean() {
+        return valueReader.getBoolean();
+    }
+
+    @Override
+    public final IValueReference getBytes() {
+        return valueReader.getBytes();
+    }
+
+    @Override
+    public final int compareTo(IColumnValuesReader o) {
+        return valueReader.compareTo(((AbstractColumnValuesReader) o).valueReader);
+    }
+
+    @Override
+    public final void write(IColumnValuesWriter writer, int count) throws HyracksDataException {
+        for (int i = 0; i < count; i++) {
+            write(writer, true);
+        }
+    }
+
+    @Override
+    public void skip(int count) throws HyracksDataException {
+        for (int i = 0; i < count; i++) {
+            next();
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
new file mode 100644
index 0000000000..b233482e31
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.column.values.reader.value.BooleanValueReader;
+import org.apache.asterix.column.values.reader.value.DoubleValueReader;
+import org.apache.asterix.column.values.reader.value.LongValueReader;
+import org.apache.asterix.column.values.reader.value.NoOpValueReader;
+import org.apache.asterix.column.values.reader.value.StringValueReader;
+import org.apache.asterix.column.values.reader.value.UUIDValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ColumnValueReaderFactory implements IColumnValuesReaderFactory {
+    @Override
+    public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, boolean primaryKey) {
+        return new PrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, primaryKey);
+    }
+
+    @Override
+    public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, int[] delimiters) {
+        return new RepeatedPrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, delimiters);
+    }
+
+    @Override
+    public IColumnValuesReader createValueReader(DataInput input) throws IOException {
+        ATypeTag typeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+        int columnIndex = input.readInt();
+        int maxLevel = input.readInt();
+        boolean primaryKey = input.readBoolean();
+        boolean collection = input.readBoolean();
+        if (collection) {
+            int[] delimiters = new int[input.readInt()];
+            for (int i = 0; i < delimiters.length; i++) {
+                delimiters[i] = input.readInt();
+            }
+            return createValueReader(typeTag, columnIndex, maxLevel, delimiters);
+        }
+        return createValueReader(typeTag, columnIndex, maxLevel, primaryKey);
+    }
+
+    private AbstractValueReader createReader(ATypeTag typeTag) {
+        switch (typeTag) {
+            case MISSING:
+            case NULL:
+                return NoOpValueReader.INSTANCE;
+            case BOOLEAN:
+                return new BooleanValueReader();
+            case BIGINT:
+                return new LongValueReader();
+            case DOUBLE:
+                return new DoubleValueReader();
+            case STRING:
+                return new StringValueReader();
+            case UUID:
+                return new UUIDValueReader();
+            default:
+                throw new UnsupportedOperationException(typeTag + " is not supported");
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
new file mode 100644
index 0000000000..e8c7bc5943
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Reader for a non-repeated primitive value
+ */
+public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReader {
+    /**
+     * A primary key value is always present. Anti-matter can be determined by checking whether the definition level
+     * indicates that the tuple's values are missing (i.e., by calling {@link #isMissing()}).
+     */
+    private final boolean primaryKey;
+
+    public PrimitiveColumnValuesReader(AbstractValueReader reader, int columnIndex, int maxLevel, boolean primaryKey) {
+        super(reader, columnIndex, maxLevel, primaryKey);
+        this.primaryKey = primaryKey;
+    }
+
+    @Override
+    public void resetValues() {
+        //NoOp
+    }
+
+    @Override
+    public boolean next() throws HyracksDataException {
+        if (valueIndex == valueCount) {
+            return false;
+        }
+        valueIndex++;
+
+        try {
+            nextLevel();
+            if (primaryKey || level == maxLevel) {
+                valueReader.nextValue();
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        return true;
+    }
+
+    @Override
+    public boolean isRepeated() {
+        return false;
+    }
+
+    @Override
+    public boolean isDelimiter() {
+        return false;
+    }
+
+    @Override
+    public int getDelimiterIndex() {
+        throw new IllegalStateException("Not a repeated reader");
+    }
+
+    @Override
+    public void write(IColumnValuesWriter writer, boolean callNext) throws HyracksDataException {
+        if (callNext && !next()) {
+            throw new IllegalStateException("No more values");
+        }
+
+        writer.writeLevel(level);
+        if (primaryKey || isValue()) {
+            try {
+                writer.writeValue(this);
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
new file mode 100644
index 0000000000..0fb98bef76
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * For primitive values that belong to an {@link ATypeTag#ARRAY} and {@link ATypeTag#MULTISET}
+ */
+public final class RepeatedPrimitiveColumnValuesReader extends AbstractColumnValuesReader {
+    private final int[] delimiters;
+    private final int[] levelToDelimiterMap;
+    private int delimiterIndex;
+
+    RepeatedPrimitiveColumnValuesReader(AbstractValueReader valueReader, int columnIndex, int maxLevel,
+            int[] delimiters) {
+        super(valueReader, columnIndex, maxLevel, false);
+        this.delimiters = delimiters;
+        delimiterIndex = delimiters.length;
+
+        levelToDelimiterMap = new int[maxLevel + 1];
+        int currentDelimiterIndex = 0;
+        for (int level = maxLevel; level >= 0; level--) {
+            if (currentDelimiterIndex < delimiters.length && level == delimiters[currentDelimiterIndex]) {
+                currentDelimiterIndex++;
+            }
+            levelToDelimiterMap[level] = currentDelimiterIndex;
+        }
+    }
+
+    @Override
+    protected void resetValues() {
+        delimiterIndex = delimiters.length;
+    }
+
+    @Override
+    public boolean next() throws HyracksDataException {
+        if (valueIndex == valueCount) {
+            return false;
+        }
+
+        consumeDelimiterIfAny();
+        nextLevel();
+        setDelimiterIndex();
+        if (level == maxLevel) {
+            valueReader.nextValue();
+        }
+        valueIndex++;
+        return true;
+    }
+
+    @Override
+    public boolean isRepeated() {
+        return true;
+    }
+
+    @Override
+    public boolean isDelimiter() {
+        return delimiterIndex < delimiters.length && level == delimiters[delimiterIndex];
+    }
+
+    @Override
+    public int getDelimiterIndex() {
+        return delimiterIndex;
+    }
+
+    @Override
+    public void write(IColumnValuesWriter writer, boolean callNext) throws HyracksDataException {
+        //We always call next as repeated values cannot be primary keys
+        if (!next()) {
+            throw new IllegalStateException("No more values");
+        }
+
+        if (isRepeatedValue()) {
+            while (!isLastDelimiter()) {
+                writer.writeLevel(level);
+                if (isValue()) {
+                    try {
+                        writer.writeValue(this);
+                    } catch (IOException e) {
+                        throw HyracksDataException.create(e);
+                    }
+                }
+                next();
+            }
+        }
+        //Add last delimiter, or NULL/MISSING
+        writer.writeLevel(level);
+    }
+
+    private boolean isRepeatedValue() {
+        return levelToDelimiterMap[level] < delimiters.length;
+    }
+
+    private boolean isLastDelimiter() {
+        return isDelimiter() && delimiterIndex == delimiters.length - 1;
+    }
+
+    private void consumeDelimiterIfAny() {
+        if (isDelimiter()) {
+            delimiterIndex++;
+        }
+    }
+
+    private void setDelimiterIndex() {
+        if (isDelimiter() || level <= delimiters[delimiters.length - 1]) {
+            return;
+        }
+        delimiterIndex = levelToDelimiterMap[level];
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
new file mode 100644
index 0000000000..3d4c744c5f
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractValueReader implements Comparable<AbstractValueReader> {
+
+    public abstract void resetValue(AbstractBytesInputStream in) throws IOException;
+
+    public abstract void nextValue() throws HyracksDataException;
+
+    public abstract ATypeTag getTypeTag();
+
+    public boolean getBoolean() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public long getLong() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public double getDouble() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public IValueReference getBytes() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
new file mode 100644
index 0000000000..34177735c0
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.decoder.ParquetRunLengthBitPackingHybridDecoder;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class BooleanValueReader extends AbstractValueReader {
+    private final ParquetRunLengthBitPackingHybridDecoder booleanReader;
+    private boolean nextValue;
+
+    public BooleanValueReader() {
+        booleanReader = new ParquetRunLengthBitPackingHybridDecoder(1);
+    }
+
+    @Override
+    public void resetValue(AbstractBytesInputStream in) {
+        booleanReader.reset(in);
+    }
+
+    @Override
+    public void nextValue() throws HyracksDataException {
+        try {
+            nextValue = booleanReader.readInt() == 1;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+
+    }
+
+    @Override
+    public boolean getBoolean() {
+        return nextValue;
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.BOOLEAN;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return Boolean.compare(nextValue, o.getBoolean());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
new file mode 100644
index 0000000000..24155f2fdb
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.decoder.ParquetDoublePlainValuesReader;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class DoubleValueReader extends AbstractValueReader {
+    private final ParquetDoublePlainValuesReader doubleReader;
+    private double nextValue;
+
+    public DoubleValueReader() {
+        doubleReader = new ParquetDoublePlainValuesReader();
+    }
+
+    @Override
+    public void resetValue(AbstractBytesInputStream in) throws IOException {
+        doubleReader.initFromPage(in);
+    }
+
+    @Override
+    public void nextValue() {
+        nextValue = doubleReader.readDouble();
+    }
+
+    @Override
+    public double getDouble() {
+        return nextValue;
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.DOUBLE;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return Double.compare(nextValue, o.getDouble());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
new file mode 100644
index 0000000000..09413d9711
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.decoder.ParquetDeltaBinaryPackingValuesReader;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+
+public final class LongValueReader extends AbstractValueReader {
+    private final ParquetDeltaBinaryPackingValuesReader longReader;
+    private long nextValue;
+
+    public LongValueReader() {
+        longReader = new ParquetDeltaBinaryPackingValuesReader();
+    }
+
+    @Override
+    public void resetValue(AbstractBytesInputStream in) throws IOException {
+        longReader.initFromPage(in);
+    }
+
+    @Override
+    public void nextValue() {
+        nextValue = longReader.readLong();
+    }
+
+    @Override
+    public long getLong() {
+        return nextValue;
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.BIGINT;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return Long.compare(nextValue, o.getLong());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
new file mode 100644
index 0000000000..fd56ff2cb5
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class NoOpValueReader extends AbstractValueReader {
+    public static final AbstractValueReader INSTANCE = new NoOpValueReader();
+
+    private NoOpValueReader() {
+    }
+
+    @Override
+    public void resetValue(AbstractBytesInputStream in) throws IOException {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    @Override
+    public void nextValue() throws HyracksDataException {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
new file mode 100644
index 0000000000..8fd887415e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.decoder.ParquetDeltaByteArrayReader;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+
+public final class StringValueReader extends AbstractValueReader {
+    private final ParquetDeltaByteArrayReader stringReader;
+    private IValueReference nextValue;
+
+    public StringValueReader() {
+        stringReader = new ParquetDeltaByteArrayReader(true);
+    }
+
+    @Override
+    public void resetValue(AbstractBytesInputStream in) throws IOException {
+        stringReader.initFromPage(in);
+    }
+
+    @Override
+    public void nextValue() {
+        nextValue = stringReader.readBytes();
+    }
+
+    @Override
+    public IValueReference getBytes() {
+        return nextValue;
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.STRING;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return UTF8StringPointable.compare(nextValue, o.getBytes());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
new file mode 100644
index 0000000000..4f240e9cff
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.decoder.ParquetDeltaByteArrayReader;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public final class UUIDValueReader extends AbstractValueReader {
+    private final ParquetDeltaByteArrayReader uuidReader;
+    private IValueReference nextValue;
+
+    public UUIDValueReader() {
+        uuidReader = new ParquetDeltaByteArrayReader(false);
+    }
+
+    @Override
+    public void resetValue(AbstractBytesInputStream in) throws IOException {
+        uuidReader.initFromPage(in);
+    }
+
+    @Override
+    public void nextValue() {
+        nextValue = uuidReader.readBytes();
+    }
+
+    @Override
+    public IValueReference getBytes() {
+        return nextValue;
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.UUID;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return AUUIDPartialBinaryComparatorFactory.compare(nextValue, o.getBytes());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
new file mode 100644
index 0000000000..87eda82f4a
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/AbstractColumnValuesWriter.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.asterix.column.bytes.encoder.ParquetRunLengthBitPackingHybridEncoder;
+import org.apache.asterix.column.util.ColumnValuesUtil;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.NoOpColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+
+public abstract class AbstractColumnValuesWriter implements IColumnValuesWriter {
+    protected final AbstractColumnFilterWriter filterWriter;
+    protected final ParquetRunLengthBitPackingHybridEncoder definitionLevels;
+    protected final int level;
+
+    private final int columnIndex;
+    private final boolean collection;
+    private final int nullBitMask;
+    private int count;
+    private boolean writeValues;
+
+    AbstractColumnValuesWriter(int columnIndex, int level, boolean collection, boolean filtered) {
+        this.columnIndex = columnIndex;
+        this.level = level;
+        this.collection = collection;
+        nullBitMask = ColumnValuesUtil.getNullMask(level);
+        int width = ColumnValuesUtil.getBitWidth(level);
+        definitionLevels = new ParquetRunLengthBitPackingHybridEncoder(width);
+        this.filterWriter = filtered ? createFilter() : NoOpColumnFilterWriter.INSTANCE;
+    }
+
+    @Override
+    public final int getColumnIndex() {
+        return columnIndex;
+    }
+
+    @Override
+    public final int getEstimatedSize() {
+        return definitionLevels.getEstimatedSize() + getValuesEstimatedSize();
+    }
+
+    @Override
+    public final int getAllocatedSpace() {
+        return definitionLevels.getAllocatedSize() + getValuesAllocatedSize();
+    }
+
+    @Override
+    public final int getCount() {
+        return count;
+    }
+
+    @Override
+    public final void writeValue(ATypeTag tag, IValueReference value) throws HyracksDataException {
+        addLevel(level);
+        try {
+            addValue(tag, value);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public final void writeLevel(int level) throws HyracksDataException {
+        addLevel(level);
+    }
+
+    @Override
+    public void writeLevels(int level, int count) throws HyracksDataException {
+        writeValues = writeValues || this.level == level;
+        this.count += count;
+        try {
+            for (int i = 0; i < count; i++) {
+                definitionLevels.writeInt(level);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public RunLengthIntArray getDefinitionLevelsIntArray() {
+        return null;
+    }
+
+    @Override
+    public final void writeNull(int level) throws HyracksDataException {
+        addLevel(level | nullBitMask);
+    }
+
+    @Override
+    public void writeValue(IColumnValuesReader reader) throws HyracksDataException {
+        try {
+            addValue(reader);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void writeAntiMatter(ATypeTag tag, IValueReference value) throws HyracksDataException {
+        addLevel(0);
+        try {
+            addValue(tag, value);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public final void close() {
+        definitionLevels.close();
+        closeValues();
+    }
+
+    @Override
+    public final long getNormalizedMinValue() {
+        if (!writeValues) {
+            // ignore values as everything is missing/null
+            return Long.MAX_VALUE;
+        }
+        return filterWriter.getMinNormalizedValue();
+    }
+
+    @Override
+    public final long getNormalizedMaxValue() {
+        if (!writeValues) {
+            // ignore values as everything is missing/null
+            return Long.MIN_VALUE;
+        }
+        return filterWriter.getMaxNormalizedValue();
+    }
+
+    @Override
+    public final void flush(OutputStream out) throws HyracksDataException {
+        BytesInput values;
+        BytesInput defLevelBytes;
+        try {
+            BytesUtils.writeZigZagVarInt(level, out);
+            defLevelBytes = definitionLevels.toBytes();
+            BytesUtils.writeZigZagVarInt((int) defLevelBytes.size(), out);
+            BytesUtils.writeZigZagVarInt(count, out);
+            defLevelBytes.writeAllTo(out);
+            if (writeValues || collection) {
+                values = getBytes();
+                int valueSize = (int) values.size();
+                BytesUtils.writeZigZagVarInt(valueSize, out);
+                values.writeAllTo(out);
+            } else {
+                /*
+                 * Do not write the values if all values are null/missing
+                 */
+                BytesUtils.writeZigZagVarInt(0, out);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        reset();
+    }
+
+    @Override
+    public final void reset() throws HyracksDataException {
+        definitionLevels.reset();
+        writeValues = false;
+        count = 0;
+        filterWriter.reset();
+        resetValues();
+    }
+
+    @Override
+    public final void serialize(DataOutput output) throws IOException {
+        output.write(getTypeTag().serialize());
+        output.writeInt(columnIndex);
+        output.writeInt(level);
+        output.writeBoolean(collection);
+        output.writeBoolean(filterWriter != NoOpColumnFilterWriter.INSTANCE);
+    }
+
+    public static IColumnValuesWriter deserialize(DataInput input, IColumnValuesWriterFactory writerFactory)
+            throws IOException {
+        ATypeTag typeTag = ATypeTag.VALUE_TYPE_MAPPING[input.readByte()];
+        int columnIndex = input.readInt();
+        int level = input.readInt();
+        boolean collection = input.readBoolean();
+        boolean filtered = input.readBoolean();
+        return writerFactory.createValueWriter(typeTag, columnIndex, level, collection, filtered);
+    }
+
+    protected void addLevel(int level) throws HyracksDataException {
+        try {
+            writeValues = writeValues || this.level == level;
+            definitionLevels.writeInt(level);
+            count++;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    protected abstract ATypeTag getTypeTag();
+
+    protected abstract void addValue(ATypeTag tag, IValueReference value) throws IOException;
+
+    protected abstract void addValue(IColumnValuesReader reader) throws IOException;
+
+    protected abstract BytesInput getBytes() throws IOException;
+
+    protected abstract int getValuesEstimatedSize();
+
+    protected abstract int getValuesAllocatedSize();
+
+    protected abstract AbstractColumnFilterWriter createFilter();
+
+    protected abstract void resetValues() throws HyracksDataException;
+
+    protected abstract void closeValues();
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
new file mode 100644
index 0000000000..0058f1883d
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/BooleanColumnValuesWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.encoder.ParquetRunLengthBitPackingHybridEncoder;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.LongColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.bytes.BytesInput;
+
+public final class BooleanColumnValuesWriter extends AbstractColumnValuesWriter {
+    private final ParquetRunLengthBitPackingHybridEncoder booleanWriter;
+
+    public BooleanColumnValuesWriter(int columnIndex, int level, boolean collection, boolean filtered) {
+        super(columnIndex, level, collection, filtered);
+        booleanWriter = new ParquetRunLengthBitPackingHybridEncoder(1);
+    }
+
+    @Override
+    protected void addValue(ATypeTag tag, IValueReference value) throws IOException {
+        byte booleanValue = value.getByteArray()[value.getStartOffset()];
+        booleanWriter.writeInt(booleanValue);
+        filterWriter.addLong(booleanValue);
+    }
+
+    @Override
+    protected void resetValues() {
+        booleanWriter.reset();
+    }
+
+    @Override
+    protected BytesInput getBytes() throws IOException {
+        return booleanWriter.toBytes();
+    }
+
+    @Override
+    protected int getValuesEstimatedSize() {
+        return booleanWriter.getEstimatedSize();
+    }
+
+    @Override
+    protected int getValuesAllocatedSize() {
+        return booleanWriter.getAllocatedSize();
+    }
+
+    @Override
+    protected void addValue(IColumnValuesReader reader) throws IOException {
+        int value = reader.getBoolean() ? 1 : 0;
+        booleanWriter.writeInt(value);
+    }
+
+    @Override
+    protected AbstractColumnFilterWriter createFilter() {
+        return new LongColumnFilterWriter();
+    }
+
+    @Override
+    protected void closeValues() {
+        booleanWriter.close();
+    }
+
+    @Override
+    protected ATypeTag getTypeTag() {
+        return ATypeTag.BOOLEAN;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
new file mode 100644
index 0000000000..490afe7605
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnBatchWriter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import static org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter.FILTER_SIZE;
+
+import java.nio.ByteBuffer;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.column.bytes.stream.out.ByteBufferOutputStream;
+import org.apache.asterix.column.bytes.stream.out.MultiPersistentBufferBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.pointer.IReservedPointer;
+import org.apache.asterix.column.values.IColumnBatchWriter;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+/**
+ * A writer for a batch columns' values
+ */
+public final class ColumnBatchWriter implements IColumnBatchWriter {
+    private final ByteBufferOutputStream primaryKeys;
+    private final MultiPersistentBufferBytesOutputStream columns;
+    private final int pageSize;
+    private final float tolerance;
+    private final IReservedPointer columnLengthPointer;
+
+    private ByteBuffer pageZero;
+    private int columnsOffset;
+    private int filtersOffset;
+    private int primaryKeysOffset;
+    private int nonKeyColumnStartOffset;
+
+    public ColumnBatchWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int pageSize, float tolerance) {
+        this.pageSize = pageSize;
+        this.tolerance = tolerance;
+        primaryKeys = new ByteBufferOutputStream();
+        columns = new MultiPersistentBufferBytesOutputStream(multiPageOpRef);
+        columnLengthPointer = columns.createPointer();
+    }
+
+    @Override
+    public void setPageZeroBuffer(ByteBuffer pageZero, int numberOfColumns, int numberOfPrimaryKeys) {
+        this.pageZero = pageZero;
+        int offset = pageZero.position();
+
+        columnsOffset = offset;
+        offset += numberOfColumns * Integer.BYTES;
+
+        filtersOffset = offset;
+        offset += numberOfColumns * FILTER_SIZE;
+
+        pageZero.position(offset);
+        primaryKeysOffset = offset;
+        primaryKeys.reset(pageZero);
+        nonKeyColumnStartOffset = pageZero.capacity();
+    }
+
+    @Override
+    public int writePrimaryKeyColumns(IColumnValuesWriter[] primaryKeyWriters) throws HyracksDataException {
+        int allocatedSpace = 0;
+        for (int i = 0; i < primaryKeyWriters.length; i++) {
+            IColumnValuesWriter writer = primaryKeyWriters[i];
+            setColumnOffset(i, primaryKeysOffset + primaryKeys.size());
+            writer.flush(primaryKeys);
+            allocatedSpace += writer.getAllocatedSpace();
+        }
+        return allocatedSpace;
+    }
+
+    @Override
+    public int writeColumns(PriorityQueue<IColumnValuesWriter> nonKeysColumnWriters) throws HyracksDataException {
+        int allocatedSpace = 0;
+        columns.reset();
+        while (!nonKeysColumnWriters.isEmpty()) {
+            IColumnValuesWriter writer = nonKeysColumnWriters.poll();
+            writeColumn(writer);
+            allocatedSpace += writer.getAllocatedSpace();
+        }
+        return allocatedSpace;
+    }
+
+    private void writeColumn(IColumnValuesWriter writer) throws HyracksDataException {
+        if (!hasEnoughSpace(columns.getCurrentBufferPosition(), writer)) {
+            /*
+             * We reset the columns stream to write all pages and confiscate a new buffer to minimize splitting
+             * the columns value into multiple pages.
+             */
+            nonKeyColumnStartOffset += columns.capacity();
+            columns.reset();
+        }
+
+        int columnRelativeOffset = columns.size();
+        columns.reserveInteger(columnLengthPointer);
+        setColumnOffset(writer.getColumnIndex(), nonKeyColumnStartOffset + columnRelativeOffset);
+
+        writeFilter(writer);
+        writer.flush(columns);
+
+        int length = columns.size() - columnRelativeOffset;
+        columnLengthPointer.setInteger(length);
+    }
+
+    private boolean hasEnoughSpace(int bufferPosition, IColumnValuesWriter columnWriter) {
+        //Estimated size mostly overestimate the size
+        int columnSize = columnWriter.getEstimatedSize();
+        float remainingPercentage = (pageSize - bufferPosition) / (float) pageSize;
+        if (columnSize > pageSize) {
+            /*
+             * If the column size is larger than the page size, we check whether the remaining space is less than
+             * the tolerance percentage
+             * - true  --> allocate new buffer and tolerate empty space
+             * - false --> we split the column into two pages
+             */
+            return remainingPercentage >= tolerance;
+        }
+
+        int freeSpace = pageSize - (bufferPosition + columnSize);
+
+        /*
+         * Check if the free space is enough to fit the column or the free space is less that the tolerance percentage
+         * - true  --> we allocate new buffer and tolerate empty space
+         * - false --> we split the column into two pages
+         */
+        return freeSpace > columnSize || remainingPercentage >= tolerance;
+    }
+
+    private void setColumnOffset(int columnIndex, int offset) {
+        pageZero.putInt(columnsOffset + Integer.BYTES * columnIndex, offset);
+    }
+
+    private void writeFilter(IColumnValuesWriter writer) {
+        int offset = filtersOffset + writer.getColumnIndex() * FILTER_SIZE;
+        pageZero.putLong(offset, writer.getNormalizedMinValue());
+        pageZero.putLong(offset + Long.BYTES, writer.getNormalizedMaxValue());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
new file mode 100644
index 0000000000..6a514fffa6
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/ColumnValuesWriterFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public class ColumnValuesWriterFactory implements IColumnValuesWriterFactory {
+    private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+
+    public ColumnValuesWriterFactory(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        this.multiPageOpRef = multiPageOpRef;
+    }
+
+    @Override
+    public IColumnValuesWriter createValueWriter(ATypeTag typeTag, int columnIndex, int maxLevel, boolean writeAlways,
+            boolean filtered) {
+        switch (typeTag) {
+            case MISSING:
+            case NULL:
+                return new NullMissingColumnValuesWriter(columnIndex, maxLevel, writeAlways, filtered);
+            case BOOLEAN:
+                return new BooleanColumnValuesWriter(columnIndex, maxLevel, writeAlways, filtered);
+            case BIGINT:
+                return new LongColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered);
+            case DOUBLE:
+                return new DoubleColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered);
+            case STRING:
+                return new StringColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered);
+            case UUID:
+                return new UUIDColumnValuesWriter(multiPageOpRef, columnIndex, maxLevel, writeAlways, filtered);
+            default:
+                throw new UnsupportedOperationException(typeTag + " is not supported");
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
new file mode 100644
index 0000000000..ca5cbb1d28
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.encoder.ParquetPlainValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.DoubleColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+
+public final class DoubleColumnValuesWriter extends AbstractColumnValuesWriter {
+    private final ParquetPlainValuesWriter doubleWriter;
+
+    public DoubleColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
+            boolean collection, boolean filtered) {
+        super(columnIndex, level, collection, filtered);
+        doubleWriter = new ParquetPlainValuesWriter(multiPageOpRef);
+    }
+
+    @Override
+    protected void addValue(ATypeTag tag, IValueReference value) throws IOException {
+        final double normalizedDouble = getValue(tag, value.getByteArray(), value.getStartOffset());
+        doubleWriter.writeDouble(normalizedDouble);
+        filterWriter.addDouble(normalizedDouble);
+    }
+
+    private double getValue(ATypeTag typeTag, byte[] byteArray, int offset) {
+        switch (typeTag) {
+            case TINYINT:
+                return byteArray[offset];
+            case SMALLINT:
+                return ShortPointable.getShort(byteArray, offset);
+            case INTEGER:
+                return IntegerPointable.getInteger(byteArray, offset);
+            case BIGINT:
+                return LongPointable.getLong(byteArray, offset);
+            case FLOAT:
+                return FloatPointable.getFloat(byteArray, offset);
+            case DOUBLE:
+                return DoublePointable.getDouble(byteArray, offset);
+            default:
+                throw new IllegalAccessError(typeTag + "is not of floating type");
+        }
+    }
+
+    @Override
+    protected void resetValues() throws HyracksDataException {
+        doubleWriter.reset();
+    }
+
+    @Override
+    protected BytesInput getBytes() throws IOException {
+        return doubleWriter.getBytes();
+    }
+
+    @Override
+    protected int getValuesEstimatedSize() {
+        return doubleWriter.getEstimatedSize();
+    }
+
+    @Override
+    protected int getValuesAllocatedSize() {
+        return doubleWriter.getAllocatedSize();
+    }
+
+    @Override
+    protected void addValue(IColumnValuesReader reader) throws IOException {
+        double value = reader.getDouble();
+        doubleWriter.writeDouble(value);
+        filterWriter.addDouble(value);
+    }
+
+    @Override
+    protected AbstractColumnFilterWriter createFilter() {
+        return new DoubleColumnFilterWriter();
+    }
+
+    @Override
+    protected void closeValues() {
+        doubleWriter.close();
+    }
+
+    @Override
+    protected ATypeTag getTypeTag() {
+        return ATypeTag.DOUBLE;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
new file mode 100644
index 0000000000..e71ec734cb
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.encoder.ParquetDeltaBinaryPackingValuesWriterForLong;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.LongColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+
+final class LongColumnValuesWriter extends AbstractColumnValuesWriter {
+    private final ParquetDeltaBinaryPackingValuesWriterForLong longWriter;
+
+    public LongColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
+            boolean collection, boolean filtered) {
+        super(columnIndex, level, collection, filtered);
+        longWriter = new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
+    }
+
+    @Override
+    protected void addValue(ATypeTag tag, IValueReference value) throws IOException {
+        final long normalizedInt = getValue(tag, value.getByteArray(), value.getStartOffset());
+        longWriter.writeLong(normalizedInt);
+        filterWriter.addLong(normalizedInt);
+    }
+
+    private long getValue(ATypeTag typeTag, byte[] byteArray, int offset) {
+        switch (typeTag) {
+            case TINYINT:
+                return byteArray[offset];
+            case SMALLINT:
+                return ShortPointable.getShort(byteArray, offset);
+            case INTEGER:
+                return IntegerPointable.getInteger(byteArray, offset);
+            case BIGINT:
+                return LongPointable.getLong(byteArray, offset);
+            default:
+                throw new IllegalAccessError(typeTag + "is not of type integer");
+        }
+    }
+
+    @Override
+    protected void resetValues() throws HyracksDataException {
+        longWriter.reset();
+    }
+
+    @Override
+    protected BytesInput getBytes() throws IOException {
+        return longWriter.getBytes();
+    }
+
+    @Override
+    protected int getValuesEstimatedSize() {
+        return longWriter.getEstimatedSize();
+    }
+
+    @Override
+    protected int getValuesAllocatedSize() {
+        return longWriter.getAllocatedSize();
+    }
+
+    @Override
+    protected void addValue(IColumnValuesReader reader) throws IOException {
+        long value = reader.getLong();
+        longWriter.writeLong(value);
+        filterWriter.addLong(value);
+    }
+
+    @Override
+    protected AbstractColumnFilterWriter createFilter() {
+        return new LongColumnFilterWriter();
+    }
+
+    @Override
+    protected void closeValues() {
+        longWriter.close();
+    }
+
+    @Override
+    protected ATypeTag getTypeTag() {
+        return ATypeTag.BIGINT;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
new file mode 100644
index 0000000000..edc9fe2ac5
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/NullMissingColumnValuesWriter.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.NoOpColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.parquet.bytes.BytesInput;
+
+public class NullMissingColumnValuesWriter extends AbstractColumnValuesWriter {
+    private static final BytesInput EMPTY = BytesInput.empty();
+    private final RunLengthIntArray defLevelsIntArray;
+
+    NullMissingColumnValuesWriter(int columnIndex, int level, boolean collection, boolean filtered) {
+        super(columnIndex, level, collection, filtered);
+        defLevelsIntArray = new RunLengthIntArray();
+    }
+
+    @Override
+    protected void addLevel(int level) throws HyracksDataException {
+        defLevelsIntArray.add(level);
+        super.addLevel(level);
+    }
+
+    @Override
+    public void writeLevels(int level, int count) throws HyracksDataException {
+        defLevelsIntArray.add(level, count);
+        super.writeLevels(level, count);
+    }
+
+    @Override
+    protected ATypeTag getTypeTag() {
+        return ATypeTag.NULL;
+    }
+
+    @Override
+    protected void addValue(ATypeTag tag, IValueReference value) throws IOException {
+        throw new IllegalStateException("Null writer should not add value");
+    }
+
+    @Override
+    protected void addValue(IColumnValuesReader reader) throws IOException {
+        throw new IllegalStateException("Null writer should not add value");
+    }
+
+    @Override
+    protected BytesInput getBytes() throws IOException {
+        return EMPTY;
+    }
+
+    @Override
+    protected int getValuesEstimatedSize() {
+        return 0;
+    }
+
+    @Override
+    protected int getValuesAllocatedSize() {
+        return 0;
+    }
+
+    @Override
+    protected AbstractColumnFilterWriter createFilter() {
+        return NoOpColumnFilterWriter.INSTANCE;
+    }
+
+    @Override
+    protected void resetValues() throws HyracksDataException {
+        defLevelsIntArray.reset();
+    }
+
+    @Override
+    protected void closeValues() {
+        defLevelsIntArray.reset();
+    }
+
+    @Override
+    public RunLengthIntArray getDefinitionLevelsIntArray() {
+        return defLevelsIntArray;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
new file mode 100644
index 0000000000..e1a3ffdb49
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import java.io.IOException;
+
+import org.apache.asterix.column.bytes.encoder.ParquetDeltaByteArrayWriter;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.StringColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.parquet.bytes.BytesInput;
+
+public class StringColumnValuesWriter extends AbstractColumnValuesWriter {
+    private final ParquetDeltaByteArrayWriter stringWriter;
+    private final boolean skipLengthBytes;
+
+    public StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
+            boolean collection, boolean filtered) {
+        this(multiPageOpRef, columnIndex, level, collection, filtered, true);
+    }
+
+    protected StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
+            boolean collection, boolean filtered, boolean skipLengthBytes) {
+        super(columnIndex, level, collection, filtered);
+        stringWriter = new ParquetDeltaByteArrayWriter(multiPageOpRef);
+        this.skipLengthBytes = skipLengthBytes;
+    }
+
+    @Override
+    protected final void addValue(ATypeTag tag, IValueReference value) throws IOException {
+        stringWriter.writeBytes(value, skipLengthBytes);
+        filterWriter.addValue(value);
+    }
+
+    @Override
+    protected final void resetValues() throws HyracksDataException {
+        stringWriter.reset();
+    }
+
+    @Override
+    protected final BytesInput getBytes() throws IOException {
+        return stringWriter.getBytes();
+    }
+
+    @Override
+    protected final int getValuesEstimatedSize() {
+        return stringWriter.getEstimatedSize();
+    }
+
+    @Override
+    protected final int getValuesAllocatedSize() {
+        return stringWriter.getAllocatedSize();
+    }
+
+    @Override
+    protected final void addValue(IColumnValuesReader reader) throws IOException {
+        IValueReference value = reader.getBytes();
+        stringWriter.writeBytes(value, skipLengthBytes);
+        filterWriter.addValue(value);
+    }
+
+    @Override
+    protected AbstractColumnFilterWriter createFilter() {
+        return new StringColumnFilterWriter();
+    }
+
+    @Override
+    protected final void closeValues() {
+        stringWriter.close();
+    }
+
+    @Override
+    protected ATypeTag getTypeTag() {
+        return ATypeTag.STRING;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
new file mode 100644
index 0000000000..1e98754e98
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer;
+
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.column.values.writer.filters.UUIDColumnFilterWriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+final class UUIDColumnValuesWriter extends StringColumnValuesWriter {
+
+    public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
+            boolean collection, boolean filtered) {
+        super(multiPageOpRef, columnIndex, level, collection, filtered, false);
+    }
+
+    @Override
+    protected AbstractColumnFilterWriter createFilter() {
+        return new UUIDColumnFilterWriter();
+    }
+
+    @Override
+    protected ATypeTag getTypeTag() {
+        return ATypeTag.UUID;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/AbstractColumnFilterWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/AbstractColumnFilterWriter.java
new file mode 100644
index 0000000000..abbe31449c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/AbstractColumnFilterWriter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer.filters;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractColumnFilterWriter {
+    public static final int FILTER_SIZE = Long.BYTES * 2;
+
+    public void addLong(long value) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public void addDouble(double value) {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public void addValue(IValueReference value) throws HyracksDataException {
+        throw new UnsupportedOperationException(getClass().getName());
+    }
+
+    public abstract long getMinNormalizedValue();
+
+    public abstract long getMaxNormalizedValue();
+
+    public abstract void reset();
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/DoubleColumnFilterWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/DoubleColumnFilterWriter.java
new file mode 100644
index 0000000000..6fccabe37e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/DoubleColumnFilterWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer.filters;
+
+public class DoubleColumnFilterWriter extends AbstractColumnFilterWriter {
+    private double min;
+    private double max;
+
+    public DoubleColumnFilterWriter() {
+        reset();
+    }
+
+    @Override
+    public void addDouble(double value) {
+        min = Math.min(min, value);
+        max = Math.max(max, value);
+    }
+
+    @Override
+    public long getMinNormalizedValue() {
+        return Double.doubleToLongBits(min);
+    }
+
+    @Override
+    public long getMaxNormalizedValue() {
+        return Double.doubleToLongBits(max);
+    }
+
+    @Override
+    public void reset() {
+        min = Double.MIN_VALUE;
+        max = Double.MAX_VALUE;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/LongColumnFilterWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/LongColumnFilterWriter.java
new file mode 100644
index 0000000000..25cb94c01c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/LongColumnFilterWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer.filters;
+
+public class LongColumnFilterWriter extends AbstractColumnFilterWriter {
+    private long min;
+    private long max;
+
+    public LongColumnFilterWriter() {
+        reset();
+    }
+
+    @Override
+    public void addLong(long value) {
+        min = Math.min(min, value);
+        max = Math.max(max, value);
+    }
+
+    @Override
+    public long getMinNormalizedValue() {
+        return min;
+    }
+
+    @Override
+    public long getMaxNormalizedValue() {
+        return max;
+    }
+
+    @Override
+    public void reset() {
+        min = Long.MAX_VALUE;
+        max = Long.MIN_VALUE;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/NoOpColumnFilterWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/NoOpColumnFilterWriter.java
new file mode 100644
index 0000000000..c4f6f6ffe7
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/NoOpColumnFilterWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer.filters;
+
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class NoOpColumnFilterWriter extends AbstractColumnFilterWriter {
+    public static final AbstractColumnFilterWriter INSTANCE = new NoOpColumnFilterWriter();
+
+    private NoOpColumnFilterWriter() {
+    }
+
+    @Override
+    public void addLong(long value) {
+        //NoOp
+    }
+
+    @Override
+    public void addDouble(double value) {
+        //NoOp
+    }
+
+    @Override
+    public void addValue(IValueReference value) {
+        //NoOp
+    }
+
+    @Override
+    public long getMinNormalizedValue() {
+        return 0;
+    }
+
+    @Override
+    public long getMaxNormalizedValue() {
+        return 0;
+    }
+
+    @Override
+    public void reset() {
+        //NoOp
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/StringColumnFilterWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/StringColumnFilterWriter.java
new file mode 100644
index 0000000000..77d82c0667
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/StringColumnFilterWriter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer.filters;
+
+import static org.apache.hyracks.util.string.UTF8StringUtil.charAt;
+import static org.apache.hyracks.util.string.UTF8StringUtil.charSize;
+import static org.apache.hyracks.util.string.UTF8StringUtil.getNumBytesToStoreLength;
+import static org.apache.hyracks.util.string.UTF8StringUtil.getUTFLength;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class StringColumnFilterWriter extends LongColumnFilterWriter {
+    @Override
+    public void addValue(IValueReference value) throws HyracksDataException {
+        addLong(normalize(value));
+    }
+
+    /**
+     * Normalizes the string in a {@link Long}
+     *
+     * @see org.apache.hyracks.util.string.UTF8StringUtil#normalize(byte[], int)
+     */
+    public static long normalize(IValueReference value) {
+        byte[] bytes = value.getByteArray();
+        int start = value.getStartOffset();
+
+        long nk = 0;
+        int offset = start + getNumBytesToStoreLength(getUTFLength(bytes, start));
+        int end = start + value.getLength();
+        for (int i = 0; i < 4; ++i) {
+            nk <<= 16;
+            if (offset < end) {
+                nk += (charAt(bytes, offset)) & 0xffff;
+                offset += charSize(bytes, offset);
+            }
+        }
+        return nk >>> 1;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/UUIDColumnFilterWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/UUIDColumnFilterWriter.java
new file mode 100644
index 0000000000..5e2bc618ab
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/filters/UUIDColumnFilterWriter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.writer.filters;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+/**
+ * UUID filters are the LSB component of all written UUIDs. This could provide false positives UUIDs; however, this
+ * still can filter out non-matching UUIDs.
+ */
+public class UUIDColumnFilterWriter extends LongColumnFilterWriter {
+
+    @Override
+    public void addValue(IValueReference value) throws HyracksDataException {
+        addLong(getLSB(value));
+    }
+
+    public static long getLSB(IValueReference value) {
+        byte[] bytes = value.getByteArray();
+        int start = value.getStartOffset();
+        return LongPointable.getLong(bytes, start + Long.BYTES);
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AUUIDPartialBinaryComparatorFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AUUIDPartialBinaryComparatorFactory.java
index 01f2537979..d842c8966a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AUUIDPartialBinaryComparatorFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AUUIDPartialBinaryComparatorFactory.java
@@ -23,6 +23,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.LongPointable;
 
 import com.fasterxml.jackson.databind.JsonNode;
@@ -37,6 +38,11 @@ public class AUUIDPartialBinaryComparatorFactory implements IBinaryComparatorFac
         return AUUIDPartialBinaryComparatorFactory::compare;
     }
 
+    public static int compare(IValueReference valueA, IValueReference valueB) {
+        return compare(valueA.getByteArray(), valueA.getStartOffset(), valueA.getLength(), valueB.getByteArray(),
+                valueB.getStartOffset(), valueB.getLength());
+    }
+
     @SuppressWarnings("squid:S1172") // unused parameter
     public static int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
         int msbCompare = Long.compare(LongPointable.getLong(b1, s1), LongPointable.getLong(b2, s2));
diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
index 49f6221e4f..8013e05fc0 100644
--- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
+++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/UTF8StringPointable.java
@@ -33,6 +33,7 @@ import org.apache.hyracks.data.std.api.IComparable;
 import org.apache.hyracks.data.std.api.IHashable;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IPointableFactory;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.util.GrowableArray;
 import org.apache.hyracks.data.std.util.UTF8StringBuilder;
 import org.apache.hyracks.util.string.UTF8StringUtil;
@@ -108,8 +109,7 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
      * Returns the character at the given byte offset. The caller is responsible for making sure that
      * the provided offset is within bounds and points to the beginning of a valid UTF8 character.
      *
-     * @param offset
-     *            - Byte offset
+     * @param offset - Byte offset
      * @return Character at the given offset.
      */
     public char charAt(int offset) {
@@ -218,13 +218,15 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
                 pointable2.utf8Length);
     }
 
+    public static int compare(IValueReference valueA, IValueReference valueB) {
+        return UTF8StringUtil.compareTo(valueA.getByteArray(), valueA.getStartOffset(), valueA.getLength(),
+                valueB.getByteArray(), valueB.getStartOffset(), valueB.getLength());
+    }
+
     /**
-     * @param src,
-     *            the source string.
-     * @param pattern,
-     *            the pattern string.
-     * @param ignoreCase,
-     *            to ignore case or not.
+     * @param src,        the source string.
+     * @param pattern,    the pattern string.
+     * @param ignoreCase, to ignore case or not.
      * @return the byte offset of the first character of the matching string. Not including the MetaLength.
      */
     public static int find(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) {
@@ -232,12 +234,9 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
     }
 
     /**
-     * @param src,
-     *            the source string.
-     * @param pattern,
-     *            the pattern string.
-     * @param ignoreCase,
-     *            to ignore case or not.
+     * @param src,        the source string.
+     * @param pattern,    the pattern string.
+     * @param ignoreCase, to ignore case or not.
      * @return the offset in the unit of code point of the first character of the matching string. Not including the MetaLength.
      */
     public static int findInCodePoint(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase) {
@@ -245,30 +244,22 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
     }
 
     /**
-     * @param src,
-     *            the source string.
-     * @param pattern,
-     *            the pattern string.
-     * @param ignoreCase,
-     *            to ignore case or not.
-     * @param startMatch,
-     *            the start offset.
+     * @param src,        the source string.
+     * @param pattern,    the pattern string.
+     * @param ignoreCase, to ignore case or not.
+     * @param startMatch, the start offset.
      * @return the byte offset of the first character of the matching string after <code>startMatchPos}</code>.
-     *         Not including the MetaLength.
+     * Not including the MetaLength.
      */
     public static int find(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase, int startMatch) {
         return findInByteOrCodePoint(src, pattern, ignoreCase, startMatch, true);
     }
 
     /**
-     * @param src,
-     *            the source string.
-     * @param pattern,
-     *            the pattern string.
-     * @param ignoreCase,
-     *            to ignore case or not.
-     * @param startMatch,
-     *            the start offset.
+     * @param src,        the source string.
+     * @param pattern,    the pattern string.
+     * @param ignoreCase, to ignore case or not.
+     * @param startMatch, the start offset.
      * @return the offset in the unit of code point of the first character of the matching string. Not including the MetaLength.
      */
     public static int findInCodePoint(UTF8StringPointable src, UTF8StringPointable pattern, boolean ignoreCase,
@@ -324,7 +315,7 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
             }
 
             // The result is counted in code point instead of bytes
-            if (resultInByte == false) {
+            if (!resultInByte) {
                 char ch = src.charAt(srcStart + startMatchPos);
                 if (Character.isHighSurrogate(ch)) {
                     prevHighSurrogate = true;
@@ -431,9 +422,10 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
 
     /**
      * Return the substring. Note that the offset and length are in the unit of code point.
+     *
      * @return {@code true} if substring was successfully written into given {@code out}, or
-     *         {@code false} if substring could not be obtained ({@code codePointOffset} or {@code codePointLength}
-     *         are less than 0 or starting position is greater than the input length)
+     * {@code false} if substring could not be obtained ({@code codePointOffset} or {@code codePointLength}
+     * are less than 0 or starting position is greater than the input length)
      */
     public boolean substr(int codePointOffset, int codePointLength, UTF8StringBuilder builder, GrowableArray out)
             throws IOException {
@@ -442,9 +434,10 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
 
     /**
      * Return the substring. Note that the offset and length are in the unit of code point.
+     *
      * @return {@code true} if substring was successfully written into given {@code out}, or
-     *         {@code false} if substring could not be obtained ({@code codePointOffset} or {@code codePointLength}
-     *         are less than 0 or starting position is greater than the input length)
+     * {@code false} if substring could not be obtained ({@code codePointOffset} or {@code codePointLength}
+     * are less than 0 or starting position is greater than the input length)
      */
     public static boolean substr(UTF8StringPointable src, int codePointOffset, int codePointLength,
             UTF8StringBuilder builder, GrowableArray out) throws IOException {
@@ -548,12 +541,9 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
     /**
      * Generates a lower case string of an input string.
      *
-     * @param src
-     *            , the input source string.
-     * @param builder
-     *            , a builder for the resulting string.
-     * @param out
-     *            , the storage for a result string.
+     * @param src     , the input source string.
+     * @param builder , a builder for the resulting string.
+     * @param out     , the storage for a result string.
      * @throws IOException
      */
     public static void lowercase(UTF8StringPointable src, UTF8StringBuilder builder, GrowableArray out)
@@ -577,12 +567,9 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
     /**
      * Generates an upper case string of an input string.
      *
-     * @param src
-     *            , the input source string.
-     * @param builder
-     *            , a builder for the resulting string.
-     * @param out
-     *            , the storage for a result string.
+     * @param src     , the input source string.
+     * @param builder , a builder for the resulting string.
+     * @param out     , the storage for a result string.
      * @throws IOException
      */
     public static void uppercase(UTF8StringPointable src, UTF8StringBuilder builder, GrowableArray out)
@@ -607,12 +594,9 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
      * Generates a "title" format string from an input source string, i.e., the first letter of each word
      * is in the upper case while the other letter is in the lower case.
      *
-     * @param src
-     *            , the input source string.
-     * @param builder
-     *            , a builder for the resulting string.
-     * @param out
-     *            , the storage for a result string.
+     * @param src     , the input source string.
+     * @param builder , a builder for the resulting string.
+     * @param out     , the storage for a result string.
      * @throws IOException
      */
     public static void initCap(UTF8StringPointable src, UTF8StringBuilder builder, GrowableArray out)
@@ -642,18 +626,12 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
     /**
      * Generates a trimmed string of an input source string.
      *
-     * @param srcPtr
-     *            , the input source string
-     * @param builder
-     *            , the result string builder.
-     * @param out
-     *            , the storage for the output string.
-     * @param left
-     *            , whether to trim the left side.
-     * @param right
-     *            , whether to trim the right side.
-     * @param codePointSet
-     *            , the set of code points that should be trimmed.
+     * @param srcPtr       , the input source string
+     * @param builder      , the result string builder.
+     * @param out          , the storage for the output string.
+     * @param left         , whether to trim the left side.
+     * @param right        , whether to trim the right side.
+     * @param codePointSet , the set of code points that should be trimmed.
      * @throws IOException
      */
     public static void trim(UTF8StringPointable srcPtr, UTF8StringBuilder builder, GrowableArray out, boolean left,
@@ -696,16 +674,11 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
     /**
      * Generates a trimmed string from the original string.
      *
-     * @param builder
-     *            , the result string builder.
-     * @param out
-     *            , the storage for the output string.
-     * @param left
-     *            , whether to trim the left side.
-     * @param right
-     *            , whether to trim the right side.
-     * @param codePointSet
-     *            , the set of code points that should be trimmed.
+     * @param builder      , the result string builder.
+     * @param out          , the storage for the output string.
+     * @param left         , whether to trim the left side.
+     * @param right        , whether to trim the right side.
+     * @param codePointSet , the set of code points that should be trimmed.
      * @throws IOException
      */
     public void trim(UTF8StringBuilder builder, GrowableArray out, boolean left, boolean right,
@@ -716,12 +689,9 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
     /**
      * Generates a reversed string from an input source string
      *
-     * @param srcPtr
-     *            , the input source string.
-     * @param builder
-     *            , a builder for the resulting string.
-     * @param out
-     *            , the storage for a result string.
+     * @param srcPtr  , the input source string.
+     * @param builder , a builder for the resulting string.
+     * @param out     , the storage for a result string.
      * @throws IOException
      */
     public static void reverse(UTF8StringPointable srcPtr, UTF8StringBuilder builder, GrowableArray out)
@@ -739,7 +709,7 @@ public final class UTF8StringPointable extends AbstractPointable implements IHas
                         cursorIndex--;
                         if (UTF8StringUtil.isCharStart(srcPtr.bytes, cursorIndex)) {
                             ch = UTF8StringUtil.charAt(srcPtr.bytes, cursorIndex);
-                            if (Character.isHighSurrogate(ch) == false) {
+                            if (!Character.isHighSurrogate(ch)) {
                                 throw new IllegalArgumentException(
                                         "Decoding Error: no corresponding high surrogate found for the following low surrogate");
                             }