You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by wy...@apache.org on 2023/04/14 22:16:29 UTC

[asterixdb] branch master updated: [ASTERIXDB-3165][STO] Avoid encoding primary keys

This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e1834f153 [ASTERIXDB-3165][STO] Avoid encoding primary keys
2e1834f153 is described below

commit 2e1834f153e1b7504978bcfa2694a21333a21203
Author: Wail Alkowaileet <wa...@gmail.com>
AuthorDate: Thu Apr 13 08:46:34 2023 -0700

    [ASTERIXDB-3165][STO] Avoid encoding primary keys
    
    - user model changes: no
    - storage format changes: yes
    - interface changes: no
    
    Details:
    This patch removes the encoding for PKs to
    allow for more efficient point lookups --
    especially for upserting with secondary indexes.
    
    Storage changes:
    PKs are no longer encoded.
    
    Change-Id: Id361d0d41b54a7ea84ec9212506809f5e0befc84
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17486
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wail Alkowaileet <wa...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../assembler/AbstractPrimitiveValueAssembler.java |   4 +-
 ...va => ParquetPlainFixedLengthValuesReader.java} |  47 +++++++-
 .../bytes/encoder/ParquetDeltaByteArrayWriter.java |   2 +
 ...va => ParquetPlainFixedLengthValuesWriter.java} |  48 ++++++--
 ...=> ParquetPlainVariableLengthValuesWriter.java} |  48 ++++----
 .../bytes/stream/in/AbstractBytesInputStream.java  |   4 +
 .../bytes/stream/in/ByteBufferInputStream.java     |   5 +
 .../column/bytes/stream/in/ValueInputStream.java   |  95 ++++++++++++++++
 .../column/bytes/stream/out/ValueOutputStream.java |  57 ++++++++++
 .../column/operation/query/ColumnAssembler.java    |   4 +-
 .../operation/query/QueryColumnMetadata.java       |  17 +--
 .../query/QueryColumnWithMetaMetadata.java         |   7 +-
 .../tuple/AbstractAsterixColumnTupleReference.java | 101 ++++++++++++++++-
 .../column/tuple/MergeColumnTupleReference.java    |  18 +--
 .../column/tuple/QueryColumnTupleReference.java    |   8 +-
 .../tuple/QueryColumnWithMetaTupleReference.java   |  10 +-
 .../column/values/IColumnKeyValueReader.java       |  42 +++++++
 .../asterix/column/values/IColumnValuesReader.java |   2 +-
 .../values/reader/AbstractColumnValuesReader.java  |   4 +-
 .../values/reader/ColumnValueReaderFactory.java    |  18 +--
 .../values/reader/PrimitiveColumnValuesReader.java |  19 +++-
 .../RepeatedPrimitiveColumnValuesReader.java       |   1 -
 .../values/reader/value/AbstractValueReader.java   |   2 +-
 .../values/reader/value/BooleanValueReader.java    |   2 +-
 .../values/reader/value/DoubleValueReader.java     |   8 +-
 .../values/reader/value/LongValueReader.java       |   2 +-
 .../values/reader/value/NoOpValueReader.java       |   2 +-
 .../values/reader/value/StringValueReader.java     |   2 +-
 .../values/reader/value/UUIDValueReader.java       |  10 +-
 .../AbstractFixedLengthColumnKeyValueReader.java   |  73 ++++++++++++
 .../DoubleKeyValueReader.java}                     |  34 ++----
 .../LongKeyValueReader.java}                       |  34 ++----
 .../reader/value/key/StringKeyValueReader.java     |  95 ++++++++++++++++
 .../UUIDKeyValueReader.java}                       |  33 ++----
 .../values/writer/DoubleColumnValuesWriter.java    |   6 +-
 .../values/writer/LongColumnValuesWriter.java      |   7 +-
 .../values/writer/StringColumnValuesWriter.java    |  14 ++-
 .../values/writer/UUIDColumnValuesWriter.java      |   4 +-
 .../column/test/bytes/AbstractBytesTest.java       |   2 +-
 .../asterix/column/test/dummy/AssemblerTest.java   |   2 +-
 .../lsm/btree/column/api/IColumnTupleIterator.java |  33 +++++-
 .../impls/btree/ColumnBTreePointSearchCursor.java  |  27 ++++-
 .../impls/btree/ColumnBTreeRangeSearchCursor.java  | 126 ++++++++++++++-------
 .../lsm/tuples/AbstractColumnTupleReference.java   |  48 ++++++--
 ...MBTreeBatchPointSearchOperatorNodePushable.java |  14 +--
 45 files changed, 892 insertions(+), 249 deletions(-)

diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
index 9f1809dfe3..5e2ef28409 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
@@ -39,10 +39,8 @@ public abstract class AbstractPrimitiveValueAssembler extends AbstractValueAssem
         this.reader = reader;
     }
 
-    public final void reset(AbstractBytesInputStream in, int startIndex, int numberOfTuples)
-            throws HyracksDataException {
+    public final void reset(AbstractBytesInputStream in, int numberOfTuples) throws HyracksDataException {
         reader.reset(in, numberOfTuples);
-        reader.skip(startIndex);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
similarity index 52%
rename from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
rename to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
index 196bec29b8..07713e1562 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetDoublePlainValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/decoder/ParquetPlainFixedLengthValuesReader.java
@@ -18,29 +18,55 @@
  */
 package org.apache.asterix.column.bytes.decoder;
 
+import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
-import org.apache.parquet.bytes.LittleEndianDataInputStream;
+import org.apache.asterix.column.bytes.stream.in.ValueInputStream;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.parquet.io.ParquetDecodingException;
 
-public class ParquetDoublePlainValuesReader extends AbstractParquetValuesReader {
-    private LittleEndianDataInputStream in;
+public class ParquetPlainFixedLengthValuesReader extends AbstractParquetValuesReader {
+    private final ValueInputStream in;
+    private final int valueLength;
+    private final IPointable valueStorage;
+
+    public ParquetPlainFixedLengthValuesReader(int valueLength) {
+        in = new ValueInputStream();
+        this.valueLength = valueLength;
+        valueStorage = null;
+    }
+
+    public ParquetPlainFixedLengthValuesReader(IPointable valueStorage) {
+        in = new ValueInputStream();
+        this.valueLength = valueStorage.getByteArray().length;
+        this.valueStorage = valueStorage;
+    }
 
     @Override
-    public void initFromPage(AbstractBytesInputStream stream) throws IOException {
-        this.in = new LittleEndianDataInputStream(stream.remainingStream());
+    public void initFromPage(AbstractBytesInputStream stream) throws EOFException {
+        in.reset(stream.remainingStream());
     }
 
     @Override
     public void skip() {
         try {
-            in.skipBytes(8);
+            in.skipBytes(valueLength);
         } catch (IOException e) {
             throw new ParquetDecodingException("could not skip double", e);
         }
     }
 
+    @Override
+    public long readLong() {
+        try {
+            return in.readLong();
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not read double", e);
+        }
+    }
+
     @Override
     public double readDouble() {
         try {
@@ -49,4 +75,13 @@ public class ParquetDoublePlainValuesReader extends AbstractParquetValuesReader
             throw new ParquetDecodingException("could not read double", e);
         }
     }
+
+    @Override
+    public IValueReference readBytes() {
+        try {
+            return in.readBytes(valueStorage, valueLength);
+        } catch (IOException e) {
+            throw new ParquetDecodingException("could not read bytes", e);
+        }
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
index 1b4611656a..4b09e4da21 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetDeltaByteArrayWriter.java
@@ -91,6 +91,8 @@ public class ParquetDeltaByteArrayWriter extends AbstractParquetValuesWriter {
         int start = value.getStartOffset();
         int length = value.getLength();
         if (skipLengthBytes) {
+            // Length bytes are skipped so the prefix encoding works properly (e.g., "123", "1234")
+            // the prefix "123" is a common substring between the two; however, their lengths are not
             int lengthBytes = UTF8StringUtil.getNumBytesToStoreLength(bytes, start);
             start += lengthBytes;
             length -= lengthBytes;
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
similarity index 65%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
index 0298e596b0..2aba7d2e78 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainFixedLengthValuesWriter.java
@@ -19,31 +19,46 @@
 package org.apache.asterix.column.bytes.encoder;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 
 import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
 import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.ValueOutputStream;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.LittleEndianDataOutputStream;
 import org.apache.parquet.column.values.plain.PlainValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
 
 /**
  * Re-implementation of {@link PlainValuesWriter}
  */
-public class ParquetPlainValuesWriter extends AbstractParquetValuesWriter {
-    public static final Charset CHARSET = StandardCharsets.UTF_8;
-
+public class ParquetPlainFixedLengthValuesWriter extends AbstractParquetValuesWriter {
     private final AbstractBytesOutputStream outputStream;
-    private final LittleEndianDataOutputStream out;
+    private final ValueOutputStream out;
 
-    public ParquetPlainValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+    public ParquetPlainFixedLengthValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
         outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
-        out = new LittleEndianDataOutputStream(outputStream);
+        out = new ValueOutputStream(outputStream);
+    }
+
+    @Override
+    public void writeInteger(int v) {
+        try {
+            out.writeInt(v);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write int", e);
+        }
+    }
+
+    @Override
+    public void writeLong(long v) {
+        try {
+            out.writeLong(v);
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write long", e);
+        }
     }
 
     @Override
@@ -55,6 +70,21 @@ public class ParquetPlainValuesWriter extends AbstractParquetValuesWriter {
         }
     }
 
+    /**
+     * Should only be used for UUID
+     *
+     * @param v               the value to encode
+     * @param skipLengthBytes ignored
+     */
+    @Override
+    public void writeBytes(IValueReference v, boolean skipLengthBytes) {
+        try {
+            out.write(v.getByteArray(), v.getStartOffset(), v.getLength());
+        } catch (IOException e) {
+            throw new ParquetEncodingException("could not write bytes", e);
+        }
+    }
+
     @Override
     public BytesInput getBytes() {
         try {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
similarity index 58%
rename from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
rename to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
index 0298e596b0..63697bc8c8 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/encoder/ParquetPlainVariableLengthValuesWriter.java
@@ -19,69 +19,69 @@
 package org.apache.asterix.column.bytes.encoder;
 
 import java.io.IOException;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
 
 import org.apache.asterix.column.bytes.stream.out.AbstractBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.GrowableBytesOutputStream;
 import org.apache.asterix.column.bytes.stream.out.MultiTemporaryBufferBytesOutputStream;
+import org.apache.asterix.column.bytes.stream.out.ValueOutputStream;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
 import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.LittleEndianDataOutputStream;
-import org.apache.parquet.column.values.plain.PlainValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
 
-/**
- * Re-implementation of {@link PlainValuesWriter}
- */
-public class ParquetPlainValuesWriter extends AbstractParquetValuesWriter {
-    public static final Charset CHARSET = StandardCharsets.UTF_8;
-
-    private final AbstractBytesOutputStream outputStream;
-    private final LittleEndianDataOutputStream out;
+public class ParquetPlainVariableLengthValuesWriter extends AbstractParquetValuesWriter {
+    private final GrowableBytesOutputStream offsetStream;
+    private final AbstractBytesOutputStream valueStream;
+    private final ValueOutputStream offsetWriterStream;
 
-    public ParquetPlainValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
-        outputStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
-        out = new LittleEndianDataOutputStream(outputStream);
+    public ParquetPlainVariableLengthValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef) {
+        offsetStream = new GrowableBytesOutputStream();
+        valueStream = new MultiTemporaryBufferBytesOutputStream(multiPageOpRef);
+        offsetWriterStream = new ValueOutputStream(offsetStream);
     }
 
     @Override
-    public final void writeDouble(double v) {
+    public void writeBytes(IValueReference v, boolean skipLengthBytes) {
         try {
-            out.writeDouble(v);
+            offsetWriterStream.writeInt(valueStream.size());
+            valueStream.write(v);
         } catch (IOException e) {
-            throw new ParquetEncodingException("could not write double", e);
+            throw new ParquetEncodingException("could not write bytes", e);
         }
     }
 
     @Override
     public BytesInput getBytes() {
         try {
-            out.flush();
+            offsetStream.flush();
+            valueStream.flush();
         } catch (IOException e) {
             throw new ParquetEncodingException("could not write page", e);
         }
-        return outputStream.asBytesInput();
+        return BytesInput.concat(offsetStream.asBytesInput(), valueStream.asBytesInput());
     }
 
     @Override
     public void reset() throws HyracksDataException {
-        outputStream.reset();
+        offsetStream.reset();
+        valueStream.reset();
     }
 
     @Override
     public void close() {
-        outputStream.finish();
+        offsetStream.finish();
+        valueStream.finish();
     }
 
     @Override
     public int getEstimatedSize() {
-        return outputStream.size();
+        return offsetStream.size() + valueStream.size();
     }
 
     @Override
     public int getAllocatedSize() {
-        return outputStream.capacity();
+        return offsetStream.capacity() + valueStream.size();
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
index b50143bb7f..034df66321 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/AbstractBytesInputStream.java
@@ -61,6 +61,10 @@ public abstract class AbstractBytesInputStream extends InputStream {
     @Override
     public abstract int available();
 
+    public ByteBuffer getBuffer() {
+        throw new UnsupportedOperationException("Getting buffer is not supported");
+    }
+
     public final void skipFully(long n) throws IOException {
         long skipped = skip(n);
         if (skipped < n) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
index 833765c316..9c3dc08674 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ByteBufferInputStream.java
@@ -166,4 +166,9 @@ public final class ByteBufferInputStream extends AbstractBytesInputStream {
     public int available() {
         return buffer.remaining();
     }
+
+    @Override
+    public ByteBuffer getBuffer() {
+        return buffer;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
new file mode 100644
index 0000000000..ee975f1f25
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/in/ValueInputStream.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.in;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+import org.apache.parquet.bytes.LittleEndianDataInputStream;
+
+/**
+ * Re-implementation of {@link LittleEndianDataInputStream}
+ */
+public final class ValueInputStream extends InputStream {
+    private final byte[] readBuffer;
+    private InputStream in;
+
+    public ValueInputStream() {
+        readBuffer = new byte[8];
+    }
+
+    public void reset(AbstractBytesInputStream in) {
+        this.in = in;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return in.read();
+    }
+
+    public int readInt() throws IOException {
+        readFully(readBuffer, Integer.BYTES);
+        return IntegerPointable.getInteger(readBuffer, 0);
+    }
+
+    public long readLong() throws IOException {
+        readFully(readBuffer, Long.BYTES);
+        return LongPointable.getLong(readBuffer, 0);
+    }
+
+    public double readDouble() throws IOException {
+        readFully(readBuffer, Double.BYTES);
+        return DoublePointable.getDouble(readBuffer, 0);
+    }
+
+    public IValueReference readBytes(IPointable valueStorage, int length) throws IOException {
+        readFully(valueStorage.getByteArray(), length);
+        return valueStorage;
+    }
+
+    public void skipBytes(int n) throws IOException {
+        int total = 0;
+        int cur;
+
+        while ((total < n) && ((cur = (int) in.skip(n - total)) > 0)) {
+            total += cur;
+        }
+    }
+
+    private void readFully(byte[] bytes, int len) throws IOException {
+        if (len < 0) {
+            throw new IndexOutOfBoundsException();
+        } else {
+            int count;
+            for (int n = 0; n < len; n += count) {
+                count = this.in.read(bytes, n, len - n);
+                if (count < 0) {
+                    throw new EOFException();
+                }
+            }
+
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
new file mode 100644
index 0000000000..a106a00a23
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/bytes/stream/out/ValueOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.bytes.stream.out;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.LongPointable;
+
+public final class ValueOutputStream extends OutputStream {
+    private final OutputStream out;
+    private final byte[] writeBuffer;
+
+    public ValueOutputStream(OutputStream out) {
+        this.out = out;
+        writeBuffer = new byte[8];
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        out.write(b);
+    }
+
+    public void writeInt(int value) throws IOException {
+        IntegerPointable.setInteger(writeBuffer, 0, value);
+        out.write(writeBuffer, 0, Integer.BYTES);
+    }
+
+    public void writeLong(long value) throws IOException {
+        LongPointable.setLong(writeBuffer, 0, value);
+        out.write(writeBuffer, 0, Long.BYTES);
+    }
+
+    public void writeDouble(double value) throws IOException {
+        DoublePointable.setDouble(writeBuffer, 0, value);
+        out.write(writeBuffer, 0, Double.BYTES);
+    }
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
index 2e60ee72c2..71d3ac6e33 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/ColumnAssembler.java
@@ -51,8 +51,8 @@ public final class ColumnAssembler {
         tupleIndex = 0;
     }
 
-    public void resetColumn(AbstractBytesInputStream stream, int startIndex, int ordinal) throws HyracksDataException {
-        assemblers.get(ordinal).reset(stream, startIndex, numberOfTuples);
+    public void resetColumn(AbstractBytesInputStream stream, int ordinal) throws HyracksDataException {
+        assemblers.get(ordinal).reset(stream, numberOfTuples);
     }
 
     public int getColumnIndex(int ordinal) {
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
index f2407d3cc7..b9babf1838 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnMetadata.java
@@ -31,8 +31,8 @@ import org.apache.asterix.column.metadata.FieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
-import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluatorFactory;
@@ -57,14 +57,14 @@ import org.apache.logging.log4j.Logger;
 public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
     private static final Logger LOGGER = LogManager.getLogger();
     private final FieldNamesDictionary fieldNamesDictionary;
-    private final IColumnValuesReader[] primaryKeyReaders;
+    private final PrimitiveColumnValuesReader[] primaryKeyReaders;
     private final IColumnFilterEvaluator filterEvaluator;
     private final List<IColumnFilterValueAccessor> filterValueAccessors;
 
     protected final ColumnAssembler assembler;
 
     protected QueryColumnMetadata(ARecordType datasetType, ARecordType metaType,
-            IColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
+            PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, IColumnValuesReaderFactory readerFactory,
             IValueGetterFactory valueGetterFactory, IColumnFilterEvaluator filterEvaluator,
             List<IColumnFilterValueAccessor> filterValueAccessors) throws HyracksDataException {
@@ -84,7 +84,7 @@ public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
         return fieldNamesDictionary;
     }
 
-    public final IColumnValuesReader[] getPrimaryKeyReaders() {
+    public final PrimitiveColumnValuesReader[] getPrimaryKeyReaders() {
         return primaryKeyReaders;
     }
 
@@ -169,7 +169,8 @@ public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
         IColumnFilterEvaluator filterEvaluator = filterEvaluatorFactory.create(filterAccessorProvider);
         List<IColumnFilterValueAccessor> filterValueAccessors = filterAccessorProvider.getFilterAccessors();
 
-        IColumnValuesReader[] primaryKeyReaders = createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
+        PrimitiveColumnValuesReader[] primaryKeyReaders =
+                createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
 
         if (LOGGER.isInfoEnabled() && filterEvaluator != TrueColumnFilterEvaluator.INSTANCE) {
             String filterString = filterEvaluator == FalseColumnFilterEvaluator.INSTANCE ? "SKIP_ALL"
@@ -192,14 +193,14 @@ public class QueryColumnMetadata extends AbstractColumnImmutableReadMetadata {
         return clippedRoot;
     }
 
-    protected static IColumnValuesReader[] createPrimaryKeyReaders(DataInput input,
+    protected static PrimitiveColumnValuesReader[] createPrimaryKeyReaders(DataInput input,
             IColumnValuesReaderFactory readerFactory, int numberOfPrimaryKeys) throws IOException {
         //skip number of columns
         input.readInt();
 
-        IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
+        PrimitiveColumnValuesReader[] primaryKeyReaders = new PrimitiveColumnValuesReader[numberOfPrimaryKeys];
         for (int i = 0; i < numberOfPrimaryKeys; i++) {
-            primaryKeyReaders[i] = readerFactory.createValueReader(input);
+            primaryKeyReaders[i] = (PrimitiveColumnValuesReader) readerFactory.createValueReader(input);
         }
         return primaryKeyReaders;
     }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
index 6f0c9748ca..2de1948b81 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/query/QueryColumnWithMetaMetadata.java
@@ -30,8 +30,8 @@ import org.apache.asterix.column.metadata.FieldNamesDictionary;
 import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
 import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
 import org.apache.asterix.column.metadata.schema.visitor.SchemaClipperVisitor;
-import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluatorFactory;
@@ -51,7 +51,7 @@ public final class QueryColumnWithMetaMetadata extends QueryColumnMetadata {
     private final ColumnAssembler metaAssembler;
 
     private QueryColumnWithMetaMetadata(ARecordType datasetType, ARecordType metaType,
-            IColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
+            PrimitiveColumnValuesReader[] primaryKeyReaders, IValueReference serializedMetadata,
             FieldNamesDictionary fieldNamesDictionary, ObjectSchemaNode root, ObjectSchemaNode metaRoot,
             IColumnValuesReaderFactory readerFactory, IValueGetterFactory valueGetterFactory,
             IColumnFilterEvaluator filterEvaluator, List<IColumnFilterValueAccessor> filterValueAccessors)
@@ -141,7 +141,8 @@ public final class QueryColumnWithMetaMetadata extends QueryColumnMetadata {
         IColumnFilterEvaluator filterEvaluator = filterEvaluatorFactory.create(filterAccessorProvider);
         List<IColumnFilterValueAccessor> filterValueAccessors = filterAccessorProvider.getFilterAccessors();
 
-        IColumnValuesReader[] primaryKeyReaders = createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
+        PrimitiveColumnValuesReader[] primaryKeyReaders =
+                createPrimaryKeyReaders(input, readerFactory, numberOfPrimaryKeys);
 
         return new QueryColumnWithMetaMetadata(datasetType, metaType, primaryKeyReaders, serializedMetadata,
                 fieldNamesDictionary, clippedRoot, metaClippedRoot, readerFactory, valueGetterFactory, filterEvaluator,
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
index df6b554111..01e974243c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
@@ -18,25 +18,33 @@
  */
 package org.apache.asterix.column.tuple;
 
+import static org.apache.hyracks.storage.am.common.frames.AbstractSlotManager.ERROR_INDICATOR;
+import static org.apache.hyracks.storage.am.common.frames.AbstractSlotManager.GREATEST_KEY_INDICATOR;
+
 import org.apache.asterix.column.assembler.value.IValueGetter;
 import org.apache.asterix.column.assembler.value.ValueGetterFactory;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream;
 import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
 import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
 import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.AbstractColumnTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
 
 public abstract class AbstractAsterixColumnTupleReference extends AbstractColumnTupleReference {
     private final IValueGetter[] primaryKeysValueGetters;
     protected final ByteBufferInputStream[] primaryKeyStreams;
-    protected final IColumnValuesReader[] primaryKeyReaders;
+    protected final PrimitiveColumnValuesReader[] primaryKeyReaders;
     protected final VoidPointable[] primaryKeys;
     protected final AbstractBytesInputStream[] columnStreams;
 
@@ -67,16 +75,24 @@ public abstract class AbstractAsterixColumnTupleReference extends AbstractColumn
         }
     }
 
-    protected abstract IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
+    protected abstract PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
+
+    @Override
+    protected void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException {
+        for (int i = 0; i < primaryKeyReaders.length; i++) {
+            PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
+            reader.reset(index, skipCount);
+            primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+        }
+    }
 
     @Override
-    protected final void startPrimaryKey(IColumnBufferProvider provider, int startIndex, int ordinal,
-            int numberOfTuples) throws HyracksDataException {
+    protected final void startPrimaryKey(IColumnBufferProvider provider, int ordinal, int numberOfTuples)
+            throws HyracksDataException {
         ByteBufferInputStream primaryKeyStream = primaryKeyStreams[ordinal];
         primaryKeyStream.reset(provider);
         IColumnValuesReader reader = primaryKeyReaders[ordinal];
         reader.reset(primaryKeyStream, numberOfTuples);
-        reader.skip(startIndex);
     }
 
     @Override
@@ -137,4 +153,79 @@ public abstract class AbstractAsterixColumnTupleReference extends AbstractColumn
         }
         return compare;
     }
+
+    @Override
+    public int findTupleIndex(ITupleReference searchKey, MultiComparator comparator, FindTupleMode mode,
+            FindTupleNoExactMatchPolicy matchPolicy) throws HyracksDataException {
+        int tupleCount = getTupleCount();
+        if (tupleCount <= 0) {
+            return GREATEST_KEY_INDICATOR;
+        }
+
+        int mid;
+        int begin = tupleIndex;
+        int end = tupleCount - 1;
+
+        while (begin <= end) {
+            mid = (begin + end) / 2;
+
+            setKeyAt(mid);
+            int cmp = comparator.compare(searchKey, this);
+            if (cmp < 0) {
+                end = mid - 1;
+            } else if (cmp > 0) {
+                begin = mid + 1;
+            } else {
+                if (mode == FindTupleMode.EXCLUSIVE) {
+                    if (matchPolicy == FindTupleNoExactMatchPolicy.HIGHER_KEY) {
+                        begin = mid + 1;
+                    } else {
+                        end = mid - 1;
+                    }
+                } else {
+                    if (mode == FindTupleMode.EXCLUSIVE_ERROR_IF_EXISTS) {
+                        return ERROR_INDICATOR;
+                    } else {
+                        return mid;
+                    }
+                }
+            }
+        }
+
+        if (mode == FindTupleMode.EXACT) {
+            return ERROR_INDICATOR;
+        }
+
+        if (matchPolicy == FindTupleNoExactMatchPolicy.HIGHER_KEY) {
+            if (begin > tupleCount - 1) {
+                return GREATEST_KEY_INDICATOR;
+            }
+
+            setKeyAt(begin);
+            if (comparator.compare(searchKey, this) < 0) {
+                return begin;
+            } else {
+                return GREATEST_KEY_INDICATOR;
+            }
+        } else {
+            if (end < 0) {
+                return GREATEST_KEY_INDICATOR;
+            }
+
+            setKeyAt(end);
+            if (comparator.compare(searchKey, this) > 0) {
+                return end;
+            } else {
+                return GREATEST_KEY_INDICATOR;
+            }
+        }
+    }
+
+    protected void setKeyAt(int index) {
+        for (int i = 0; i < primaryKeyReaders.length; i++) {
+            PrimitiveColumnValuesReader reader = primaryKeyReaders[i];
+            reader.getValue(index);
+            primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+        }
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
index c10d41550c..56b0a570f6 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -24,6 +24,7 @@ import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
 import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
 import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
 import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
@@ -43,11 +44,14 @@ public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleR
     }
 
     @Override
-    protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+    protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
         MergeColumnReadMetadata columnMetadata = (MergeColumnReadMetadata) info;
         int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
-        IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
-        System.arraycopy(columnMetadata.getColumnReaders(), 0, primaryKeyReaders, 0, numberOfPrimaryKeys);
+        PrimitiveColumnValuesReader[] primaryKeyReaders = new PrimitiveColumnValuesReader[numberOfPrimaryKeys];
+        IColumnValuesReader[] readers = columnMetadata.getColumnReaders();
+        for (int i = 0; i < numberOfPrimaryKeys; i++) {
+            primaryKeyReaders[i] = (PrimitiveColumnValuesReader) readers[i];
+        }
         return primaryKeyReaders;
     }
 
@@ -55,14 +59,15 @@ public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleR
     protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples) {
         //Skip filters
         pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
-        skipCount = 0;
+        // skip count is always start from zero as no "search" is conducted during a merge
+        this.skipCount = 0;
         return true;
     }
 
     @Override
-    protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+    protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException {
-        int numberOfPrimaryKeys = primaryKeys.length;
+        int numberOfPrimaryKeys = primaryKeyStreams.length;
         if (ordinal < numberOfPrimaryKeys) {
             //Skip primary key
             return;
@@ -71,7 +76,6 @@ public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleR
         columnStream.reset(buffersProvider);
         IColumnValuesReader reader = columnReaders[ordinal];
         reader.reset(columnStream, numberOfTuples);
-        reader.skip(startIndex);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
index e286235462..b70bddc3e2 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnTupleReference.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.operation.query.ColumnAssembler;
 import org.apache.asterix.column.operation.query.QueryColumnMetadata;
-import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterValueAccessor;
@@ -50,7 +50,7 @@ public final class QueryColumnTupleReference extends AbstractAsterixColumnTupleR
     }
 
     @Override
-    protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+    protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
         return ((QueryColumnMetadata) info).getPrimaryKeyReaders();
     }
 
@@ -69,11 +69,11 @@ public final class QueryColumnTupleReference extends AbstractAsterixColumnTupleR
     }
 
     @Override
-    protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+    protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException {
         AbstractBytesInputStream columnStream = columnStreams[ordinal];
         columnStream.reset(buffersProvider);
-        assembler.resetColumn(columnStream, startIndex, ordinal);
+        assembler.resetColumn(columnStream, ordinal);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
index a5cedc1aac..5b7794e7de 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/QueryColumnWithMetaTupleReference.java
@@ -25,7 +25,7 @@ import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.column.operation.query.ColumnAssembler;
 import org.apache.asterix.column.operation.query.QueryColumnMetadata;
 import org.apache.asterix.column.operation.query.QueryColumnWithMetaMetadata;
-import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.reader.PrimitiveColumnValuesReader;
 import org.apache.asterix.column.values.reader.filter.FilterAccessorProvider;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterEvaluator;
 import org.apache.asterix.column.values.reader.filter.IColumnFilterValueAccessor;
@@ -53,7 +53,7 @@ public final class QueryColumnWithMetaTupleReference extends AbstractAsterixColu
     }
 
     @Override
-    protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+    protected PrimitiveColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
         return ((QueryColumnMetadata) info).getPrimaryKeyReaders();
     }
 
@@ -73,15 +73,15 @@ public final class QueryColumnWithMetaTupleReference extends AbstractAsterixColu
     }
 
     @Override
-    protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+    protected void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
             throws HyracksDataException {
         AbstractBytesInputStream columnStream = columnStreams[ordinal];
         columnStream.reset(buffersProvider);
         int metaColumnCount = metaAssembler.getNumberOfColumns();
         if (ordinal >= metaColumnCount) {
-            assembler.resetColumn(columnStream, startIndex, ordinal - metaColumnCount);
+            assembler.resetColumn(columnStream, ordinal - metaColumnCount);
         } else {
-            metaAssembler.resetColumn(columnStream, startIndex, ordinal);
+            metaAssembler.resetColumn(columnStream, ordinal);
         }
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
new file mode 100644
index 0000000000..4ee2780b25
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnKeyValueReader.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+/**
+ * Accessor for key columns
+ */
+public interface IColumnKeyValueReader {
+    /**
+     * Reset the reader at the given index
+     *
+     * @param startIndex start index
+     */
+    void reset(int startIndex, int skipCount) throws HyracksDataException;
+
+    /**
+     * Returns the value of the key at the given index
+     *
+     * @param index tuple index
+     * @return the key value
+     */
+    IValueReference getValue(int index);
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
index 0f4cc0c620..fe23b23233 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/IColumnValuesReader.java
@@ -104,7 +104,7 @@ public interface IColumnValuesReader extends Comparable<IColumnValuesReader> {
     IValueReference getBytes();
 
     /* ***********************
-     * Write function
+     * Write functions
      * ***********************
      */
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
index c0cf18a6a1..41dfea2908 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/AbstractColumnValuesReader.java
@@ -59,6 +59,8 @@ abstract class AbstractColumnValuesReader implements IColumnValuesReader {
         if (allMissing) {
             return;
         }
+
+        valueIndex++;
         try {
             int actualLevel = definitionLevels.readInt();
             //Check whether the level is for a null value
@@ -90,7 +92,7 @@ abstract class AbstractColumnValuesReader implements IColumnValuesReader {
             valuesStream.resetAt(defLevelsSize, in);
             int valueLength = BytesUtils.readZigZagVarInt(valuesStream);
             if (valueLength > 0) {
-                valueReader.resetValue(valuesStream);
+                valueReader.init(valuesStream, tupleCount);
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
index b233482e31..bf80580093 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/ColumnValueReaderFactory.java
@@ -30,17 +30,21 @@ import org.apache.asterix.column.values.reader.value.LongValueReader;
 import org.apache.asterix.column.values.reader.value.NoOpValueReader;
 import org.apache.asterix.column.values.reader.value.StringValueReader;
 import org.apache.asterix.column.values.reader.value.UUIDValueReader;
+import org.apache.asterix.column.values.reader.value.key.DoubleKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.LongKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.StringKeyValueReader;
+import org.apache.asterix.column.values.reader.value.key.UUIDKeyValueReader;
 import org.apache.asterix.om.types.ATypeTag;
 
 public class ColumnValueReaderFactory implements IColumnValuesReaderFactory {
     @Override
     public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, boolean primaryKey) {
-        return new PrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, primaryKey);
+        return new PrimitiveColumnValuesReader(createReader(typeTag, primaryKey), columnIndex, maxLevel, primaryKey);
     }
 
     @Override
     public IColumnValuesReader createValueReader(ATypeTag typeTag, int columnIndex, int maxLevel, int[] delimiters) {
-        return new RepeatedPrimitiveColumnValuesReader(createReader(typeTag), columnIndex, maxLevel, delimiters);
+        return new RepeatedPrimitiveColumnValuesReader(createReader(typeTag, false), columnIndex, maxLevel, delimiters);
     }
 
     @Override
@@ -60,7 +64,7 @@ public class ColumnValueReaderFactory implements IColumnValuesReaderFactory {
         return createValueReader(typeTag, columnIndex, maxLevel, primaryKey);
     }
 
-    private AbstractValueReader createReader(ATypeTag typeTag) {
+    private AbstractValueReader createReader(ATypeTag typeTag, boolean primaryKey) {
         switch (typeTag) {
             case MISSING:
             case NULL:
@@ -68,13 +72,13 @@ public class ColumnValueReaderFactory implements IColumnValuesReaderFactory {
             case BOOLEAN:
                 return new BooleanValueReader();
             case BIGINT:
-                return new LongValueReader();
+                return primaryKey ? new LongKeyValueReader() : new LongValueReader();
             case DOUBLE:
-                return new DoubleValueReader();
+                return primaryKey ? new DoubleKeyValueReader() : new DoubleValueReader();
             case STRING:
-                return new StringValueReader();
+                return primaryKey ? new StringKeyValueReader() : new StringValueReader();
             case UUID:
-                return new UUIDValueReader();
+                return primaryKey ? new UUIDKeyValueReader() : new UUIDValueReader();
             default:
                 throw new UnsupportedOperationException(typeTag + " is not supported");
         }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
index e8c7bc5943..7ae295437f 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/PrimitiveColumnValuesReader.java
@@ -20,14 +20,16 @@ package org.apache.asterix.column.values.reader;
 
 import java.io.IOException;
 
+import org.apache.asterix.column.values.IColumnKeyValueReader;
 import org.apache.asterix.column.values.IColumnValuesWriter;
 import org.apache.asterix.column.values.reader.value.AbstractValueReader;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 
 /**
  * Reader for a non-repeated primitive value
  */
-public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReader {
+public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReader implements IColumnKeyValueReader {
     /**
      * A primary key value is always present. Anti-matter can be determined by checking whether the definition level
      * indicates that the tuple's values are missing (i.e., by calling {@link #isMissing()}).
@@ -49,7 +51,6 @@ public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReade
         if (valueIndex == valueCount) {
             return false;
         }
-        valueIndex++;
 
         try {
             nextLevel();
@@ -92,4 +93,18 @@ public final class PrimitiveColumnValuesReader extends AbstractColumnValuesReade
             }
         }
     }
+
+    @Override
+    public IValueReference getValue(int index) {
+        return ((IColumnKeyValueReader) valueReader).getValue(index);
+    }
+
+    @Override
+    public void reset(int startIndex, int skipCount) throws HyracksDataException {
+        ((IColumnKeyValueReader) valueReader).reset(startIndex, skipCount);
+        nextLevel();
+        for (int i = 1; i < skipCount; i++) {
+            nextLevel();
+        }
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
index 673aa98bd7..1cd424bf4a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/RepeatedPrimitiveColumnValuesReader.java
@@ -66,7 +66,6 @@ public final class RepeatedPrimitiveColumnValuesReader extends AbstractColumnVal
         if (level == maxLevel) {
             valueReader.nextValue();
         }
-        valueIndex++;
         return true;
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
index 3d4c744c5f..4db082fc00 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/AbstractValueReader.java
@@ -27,7 +27,7 @@ import org.apache.hyracks.data.std.api.IValueReference;
 
 public abstract class AbstractValueReader implements Comparable<AbstractValueReader> {
 
-    public abstract void resetValue(AbstractBytesInputStream in) throws IOException;
+    public abstract void init(AbstractBytesInputStream in, int tupleCount) throws IOException;
 
     public abstract void nextValue() throws HyracksDataException;
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
index 34177735c0..6b5e0d4bd7 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/BooleanValueReader.java
@@ -34,7 +34,7 @@ public final class BooleanValueReader extends AbstractValueReader {
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) {
+    public void init(AbstractBytesInputStream in, int tupleCount) {
         booleanReader.reset(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
index 24155f2fdb..faa60d1879 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
@@ -20,20 +20,20 @@ package org.apache.asterix.column.values.reader.value;
 
 import java.io.IOException;
 
-import org.apache.asterix.column.bytes.decoder.ParquetDoublePlainValuesReader;
+import org.apache.asterix.column.bytes.decoder.ParquetPlainFixedLengthValuesReader;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.om.types.ATypeTag;
 
 public final class DoubleValueReader extends AbstractValueReader {
-    private final ParquetDoublePlainValuesReader doubleReader;
+    private final ParquetPlainFixedLengthValuesReader doubleReader;
     private double nextValue;
 
     public DoubleValueReader() {
-        doubleReader = new ParquetDoublePlainValuesReader();
+        doubleReader = new ParquetPlainFixedLengthValuesReader(Double.BYTES);
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         doubleReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
index 09413d9711..c22687e03c 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
@@ -33,7 +33,7 @@ public final class LongValueReader extends AbstractValueReader {
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         longReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
index fd56ff2cb5..1982c54c91 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/NoOpValueReader.java
@@ -31,7 +31,7 @@ public class NoOpValueReader extends AbstractValueReader {
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         throw new UnsupportedOperationException(getClass().getName());
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
index 8fd887415e..19da3dd38a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/StringValueReader.java
@@ -35,7 +35,7 @@ public final class StringValueReader extends AbstractValueReader {
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         stringReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
index 4f240e9cff..75179601ee 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
@@ -20,22 +20,24 @@ package org.apache.asterix.column.values.reader.value;
 
 import java.io.IOException;
 
-import org.apache.asterix.column.bytes.decoder.ParquetDeltaByteArrayReader;
+import org.apache.asterix.column.bytes.decoder.ParquetPlainFixedLengthValuesReader;
 import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
 import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 public final class UUIDValueReader extends AbstractValueReader {
-    private final ParquetDeltaByteArrayReader uuidReader;
+    private final ParquetPlainFixedLengthValuesReader uuidReader;
     private IValueReference nextValue;
 
     public UUIDValueReader() {
-        uuidReader = new ParquetDeltaByteArrayReader(false);
+        ArrayBackedValueStorage storage = new ArrayBackedValueStorage(16);
+        uuidReader = new ParquetPlainFixedLengthValuesReader(storage);
     }
 
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
         uuidReader.initFromPage(in);
     }
 
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
new file mode 100644
index 0000000000..fc458f3298
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/AbstractFixedLengthColumnKeyValueReader.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnKeyValueReader;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+abstract class AbstractFixedLengthColumnKeyValueReader extends AbstractValueReader implements IColumnKeyValueReader {
+    protected final IPointable value;
+    private ByteBuffer buffer;
+    private int startOffset;
+
+    AbstractFixedLengthColumnKeyValueReader() {
+        value = new VoidPointable();
+    }
+
+    @Override
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
+        buffer = in.getBuffer();
+        startOffset = buffer.position();
+        value.set(null, 0, 0);
+    }
+
+    @Override
+    public void reset(int startIndex, int skipCount) {
+        getValue(startIndex);
+    }
+
+    @Override
+    public IValueReference getValue(int index) {
+        int valueLength = getValueLength();
+        int offset = startOffset + index * valueLength;
+        value.set(buffer.array(), offset, valueLength);
+        return value;
+    }
+
+    @Override
+    public void nextValue() {
+        if (value.getByteArray() == null) {
+            getValue(0);
+            return;
+        }
+        int valueLength = getValueLength();
+        int offset = value.getStartOffset() + valueLength;
+        value.set(buffer.array(), offset, valueLength);
+    }
+
+    protected abstract int getValueLength();
+
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java
similarity index 57%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java
index 24155f2fdb..cfb8af43f1 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/DoubleValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/DoubleKeyValueReader.java
@@ -16,44 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.column.values.reader.value;
+package org.apache.asterix.column.values.reader.value.key;
 
-import java.io.IOException;
-
-import org.apache.asterix.column.bytes.decoder.ParquetDoublePlainValuesReader;
-import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
 
-public final class DoubleValueReader extends AbstractValueReader {
-    private final ParquetDoublePlainValuesReader doubleReader;
-    private double nextValue;
-
-    public DoubleValueReader() {
-        doubleReader = new ParquetDoublePlainValuesReader();
-    }
-
+public final class DoubleKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
-        doubleReader.initFromPage(in);
+    public ATypeTag getTypeTag() {
+        return ATypeTag.DOUBLE;
     }
 
     @Override
-    public void nextValue() {
-        nextValue = doubleReader.readDouble();
+    protected int getValueLength() {
+        return Double.BYTES;
     }
 
     @Override
     public double getDouble() {
-        return nextValue;
-    }
-
-    @Override
-    public ATypeTag getTypeTag() {
-        return ATypeTag.DOUBLE;
+        return DoublePointable.getDouble(value.getByteArray(), value.getStartOffset());
     }
 
     @Override
     public int compareTo(AbstractValueReader o) {
-        return Double.compare(nextValue, o.getDouble());
+        return Double.compare(getDouble(), o.getDouble());
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
similarity index 57%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
index 09413d9711..a981dca644 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/LongValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/LongKeyValueReader.java
@@ -16,44 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.column.values.reader.value;
+package org.apache.asterix.column.values.reader.value.key;
 
-import java.io.IOException;
-
-import org.apache.asterix.column.bytes.decoder.ParquetDeltaBinaryPackingValuesReader;
-import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.LongPointable;
 
-public final class LongValueReader extends AbstractValueReader {
-    private final ParquetDeltaBinaryPackingValuesReader longReader;
-    private long nextValue;
-
-    public LongValueReader() {
-        longReader = new ParquetDeltaBinaryPackingValuesReader();
-    }
-
+public final class LongKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
-        longReader.initFromPage(in);
+    public ATypeTag getTypeTag() {
+        return ATypeTag.BIGINT;
     }
 
     @Override
-    public void nextValue() {
-        nextValue = longReader.readLong();
+    protected int getValueLength() {
+        return Long.BYTES;
     }
 
     @Override
     public long getLong() {
-        return nextValue;
-    }
-
-    @Override
-    public ATypeTag getTypeTag() {
-        return ATypeTag.BIGINT;
+        return LongPointable.getLong(value.getByteArray(), value.getStartOffset());
     }
 
     @Override
     public int compareTo(AbstractValueReader o) {
-        return Long.compare(nextValue, o.getLong());
+        return Long.compare(getLong(), o.getLong());
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
new file mode 100644
index 0000000000..d6d7c4c827
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/StringKeyValueReader.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.values.reader.value.key;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnKeyValueReader;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+
+public final class StringKeyValueReader extends AbstractValueReader implements IColumnKeyValueReader {
+    private final IPointable value;
+    private ByteBuffer buffer;
+    private int startOffset;
+    private int tupleCount;
+
+    public StringKeyValueReader() {
+        value = new VoidPointable();
+    }
+
+    @Override
+    public void init(AbstractBytesInputStream in, int tupleCount) throws IOException {
+        buffer = in.getBuffer();
+        startOffset = buffer.position();
+        this.tupleCount = tupleCount;
+        value.set(null, 0, 0);
+    }
+
+    @Override
+    public void reset(int startIndex, int skipCount) {
+        getValue(startIndex);
+    }
+
+    @Override
+    public IValueReference getValue(int index) {
+        byte[] bytes = buffer.array();
+        int indexOffset = startOffset + index * Integer.BYTES;
+        int valueOffset = startOffset + tupleCount * Integer.BYTES + IntegerPointable.getInteger(bytes, indexOffset);
+        int valueLength = UTF8StringUtil.getUTFLength(bytes, valueOffset);
+        valueLength += UTF8StringUtil.getNumBytesToStoreLength(valueLength);
+        value.set(bytes, valueOffset, valueLength);
+        return value;
+    }
+
+    @Override
+    public IValueReference getBytes() {
+        return value;
+    }
+
+    @Override
+    public void nextValue() {
+        if (value.getByteArray() == null) {
+            getValue(0);
+            return;
+        }
+        int offset = value.getStartOffset() + value.getLength();
+        int length = UTF8StringUtil.getUTFLength(buffer.array(), offset);
+        length += UTF8StringUtil.getNumBytesToStoreLength(length);
+        value.set(buffer.array(), offset, length);
+    }
+
+    @Override
+    public ATypeTag getTypeTag() {
+        return ATypeTag.STRING;
+    }
+
+    @Override
+    public int compareTo(AbstractValueReader o) {
+        return UTF8StringPointable.compare(getBytes(), o.getBytes());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java
similarity index 60%
copy from asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java
index 4f240e9cff..141e9f8369 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/UUIDValueReader.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/reader/value/key/UUIDKeyValueReader.java
@@ -16,46 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.column.values.reader.value;
+package org.apache.asterix.column.values.reader.value.key;
 
-import java.io.IOException;
-
-import org.apache.asterix.column.bytes.decoder.ParquetDeltaByteArrayReader;
-import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.reader.value.AbstractValueReader;
 import org.apache.asterix.dataflow.data.nontagged.comparators.AUUIDPartialBinaryComparatorFactory;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.data.std.api.IValueReference;
 
-public final class UUIDValueReader extends AbstractValueReader {
-    private final ParquetDeltaByteArrayReader uuidReader;
-    private IValueReference nextValue;
-
-    public UUIDValueReader() {
-        uuidReader = new ParquetDeltaByteArrayReader(false);
-    }
-
+public final class UUIDKeyValueReader extends AbstractFixedLengthColumnKeyValueReader {
     @Override
-    public void resetValue(AbstractBytesInputStream in) throws IOException {
-        uuidReader.initFromPage(in);
+    public ATypeTag getTypeTag() {
+        return ATypeTag.UUID;
     }
 
     @Override
-    public void nextValue() {
-        nextValue = uuidReader.readBytes();
+    protected int getValueLength() {
+        return 16;
     }
 
     @Override
     public IValueReference getBytes() {
-        return nextValue;
-    }
-
-    @Override
-    public ATypeTag getTypeTag() {
-        return ATypeTag.UUID;
+        return value;
     }
 
     @Override
     public int compareTo(AbstractValueReader o) {
-        return AUUIDPartialBinaryComparatorFactory.compare(nextValue, o.getBytes());
+        return AUUIDPartialBinaryComparatorFactory.compare(getBytes(), o.getBytes());
     }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
index ca5cbb1d28..5963d23181 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/DoubleColumnValuesWriter.java
@@ -20,7 +20,7 @@ package org.apache.asterix.column.values.writer;
 
 import java.io.IOException;
 
-import org.apache.asterix.column.bytes.encoder.ParquetPlainValuesWriter;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.DoubleColumnFilterWriter;
@@ -37,12 +37,12 @@ import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageO
 import org.apache.parquet.bytes.BytesInput;
 
 public final class DoubleColumnValuesWriter extends AbstractColumnValuesWriter {
-    private final ParquetPlainValuesWriter doubleWriter;
+    private final ParquetPlainFixedLengthValuesWriter doubleWriter;
 
     public DoubleColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
         super(columnIndex, level, collection, filtered);
-        doubleWriter = new ParquetPlainValuesWriter(multiPageOpRef);
+        doubleWriter = new ParquetPlainFixedLengthValuesWriter(multiPageOpRef);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
index e71ec734cb..e6ada5555a 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/LongColumnValuesWriter.java
@@ -20,7 +20,9 @@ package org.apache.asterix.column.values.writer;
 
 import java.io.IOException;
 
+import org.apache.asterix.column.bytes.encoder.AbstractParquetValuesWriter;
 import org.apache.asterix.column.bytes.encoder.ParquetDeltaBinaryPackingValuesWriterForLong;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.LongColumnFilterWriter;
@@ -35,12 +37,13 @@ import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageO
 import org.apache.parquet.bytes.BytesInput;
 
 final class LongColumnValuesWriter extends AbstractColumnValuesWriter {
-    private final ParquetDeltaBinaryPackingValuesWriterForLong longWriter;
+    private final AbstractParquetValuesWriter longWriter;
 
     public LongColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
         super(columnIndex, level, collection, filtered);
-        longWriter = new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
+        longWriter = !filtered ? new ParquetPlainFixedLengthValuesWriter(multiPageOpRef)
+                : new ParquetDeltaBinaryPackingValuesWriterForLong(multiPageOpRef);
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
index e1a3ffdb49..b0d5a93195 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/StringColumnValuesWriter.java
@@ -20,7 +20,9 @@ package org.apache.asterix.column.values.writer;
 
 import java.io.IOException;
 
+import org.apache.asterix.column.bytes.encoder.AbstractParquetValuesWriter;
 import org.apache.asterix.column.bytes.encoder.ParquetDeltaByteArrayWriter;
+import org.apache.asterix.column.bytes.encoder.ParquetPlainVariableLengthValuesWriter;
 import org.apache.asterix.column.values.IColumnValuesReader;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.StringColumnFilterWriter;
@@ -32,18 +34,19 @@ import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageO
 import org.apache.parquet.bytes.BytesInput;
 
 public class StringColumnValuesWriter extends AbstractColumnValuesWriter {
-    private final ParquetDeltaByteArrayWriter stringWriter;
+    private final AbstractParquetValuesWriter stringWriter;
     private final boolean skipLengthBytes;
 
     public StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
-        this(multiPageOpRef, columnIndex, level, collection, filtered, true);
+        this(columnIndex, level, collection, filtered, true, filtered ? new ParquetDeltaByteArrayWriter(multiPageOpRef)
+                : new ParquetPlainVariableLengthValuesWriter(multiPageOpRef));
     }
 
-    protected StringColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
-            boolean collection, boolean filtered, boolean skipLengthBytes) {
+    protected StringColumnValuesWriter(int columnIndex, int level, boolean collection, boolean filtered,
+            boolean skipLengthBytes, AbstractParquetValuesWriter stringWriter) {
         super(columnIndex, level, collection, filtered);
-        stringWriter = new ParquetDeltaByteArrayWriter(multiPageOpRef);
+        this.stringWriter = stringWriter;
         this.skipLengthBytes = skipLengthBytes;
     }
 
@@ -94,4 +97,5 @@ public class StringColumnValuesWriter extends AbstractColumnValuesWriter {
     protected ATypeTag getTypeTag() {
         return ATypeTag.STRING;
     }
+
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
index 1e98754e98..9d4ff9a523 100644
--- a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/values/writer/UUIDColumnValuesWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.column.values.writer;
 
+import org.apache.asterix.column.bytes.encoder.ParquetPlainFixedLengthValuesWriter;
 import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
 import org.apache.asterix.column.values.writer.filters.UUIDColumnFilterWriter;
 import org.apache.asterix.om.types.ATypeTag;
@@ -28,7 +29,8 @@ final class UUIDColumnValuesWriter extends StringColumnValuesWriter {
 
     public UUIDColumnValuesWriter(Mutable<IColumnWriteMultiPageOp> multiPageOpRef, int columnIndex, int level,
             boolean collection, boolean filtered) {
-        super(multiPageOpRef, columnIndex, level, collection, filtered, false);
+        // UUID is always written without encoding
+        super(columnIndex, level, collection, filtered, false, new ParquetPlainFixedLengthValuesWriter(multiPageOpRef));
     }
 
     @Override
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
index 03c14aea17..36ebab9b75 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/bytes/AbstractBytesTest.java
@@ -271,7 +271,7 @@ public abstract class AbstractBytesTest extends TestBase {
                 LOGGER.info("READ PageZero {}", pageNumber++);
                 assembler.reset(prepareRead(pageZero, providers, streams));
                 for (int i = 0; i < streams.length; i++) {
-                    assembler.resetColumn(streams[i], 0, i);
+                    assembler.resetColumn(streams[i], i);
                 }
                 writeForPageZero(ps, assembler);
             }
diff --git a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
index 8f53ebcf52..ff088d696c 100644
--- a/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
+++ b/asterixdb/asterix-column/src/test/java/org/apache/asterix/column/test/dummy/AssemblerTest.java
@@ -125,7 +125,7 @@ public class AssemblerTest extends AbstractDummyTest {
             Pair<PrintStream, ATypeTag> pair = new Pair<>(ps, ATypeTag.OBJECT);
             assembler.reset(numberOfTuples);
             for (int i = 0; i < columnMetadata.getNumberOfColumns(); i++) {
-                assembler.resetColumn(streams[i], 0, i);
+                assembler.resetColumn(streams[i], i);
             }
             while (assembler.hasNext()) {
                 IValueReference record = assembler.nextValue();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
index 0c955002fa..4034906f25 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/api/IColumnTupleIterator.java
@@ -19,9 +19,13 @@
 package org.apache.hyracks.storage.am.lsm.btree.column.api;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
+import org.apache.hyracks.storage.common.MultiComparator;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.ICachedPage;
 
@@ -30,12 +34,39 @@ import org.apache.hyracks.storage.common.buffercache.ICachedPage;
  * set that could span multiple pages.
  */
 public interface IColumnTupleIterator extends ILSMTreeTupleReference, Comparable<IColumnTupleIterator> {
+    /**
+     * Indicates a new page was set to prepare the iterator
+     */
+    void newPage() throws HyracksDataException;
+
     /**
      * Reset the iterator starting at the provided index
      *
      * @param startIndex start from the tuple at this index
+     * @param endIndex   stop at this index (exclusive)
+     */
+    void reset(int startIndex, int endIndex) throws HyracksDataException;
+
+    /**
+     * Set the iterator at a new position
+     * NOTE:
+     * the new start index has to be greater than the current tuple index
+     *
+     * @param startIndex the new index to start from
+     */
+    void setAt(int startIndex) throws HyracksDataException;
+
+    /**
+     * Finds the tuple index given the search key
+     *
+     * @param searchKey search key
+     * @param cmp       comparator
+     * @param ftm       find tuple mode
+     * @param ftp       find tuple policy
+     * @return index of the tuple
      */
-    void reset(int startIndex) throws HyracksDataException;
+    int findTupleIndex(ITupleReference searchKey, MultiComparator cmp, FindTupleMode ftm,
+            FindTupleNoExactMatchPolicy ftp) throws HyracksDataException;
 
     /**
      * Mark {@link IColumnTupleIterator} as consumed
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
index 683e0995da..db878c41c4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreePointSearchCursor.java
@@ -20,7 +20,10 @@ package org.apache.hyracks.storage.am.lsm.btree.column.impls.btree;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.btree.api.IDiskBTreeStatefulPointSearchCursor;
+import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.common.IIndexCursorStats;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
@@ -56,7 +59,29 @@ public class ColumnBTreePointSearchCursor extends ColumnBTreeRangeSearchCursor
 
     @Override
     public void setCursorToNextKey(ISearchPredicate searchPred) throws HyracksDataException {
-        initCursorPosition(searchPred);
+        int index = getLowKeyIndex();
+        if (index == frame.getTupleCount()) {
+            frameTuple.consume();
+            yieldFirstCall = false;
+            return;
+        }
+        frameTuple.setAt(index);
+        yieldFirstCall = true;
+    }
+
+    @Override
+    protected void setSearchPredicate(ISearchPredicate searchPred) {
+        pred = (RangePredicate) searchPred;
+        lowKey = pred.getLowKey();
+        lowKeyFtm = FindTupleMode.EXACT;
+        lowKeyFtp = FindTupleNoExactMatchPolicy.NONE;
+        reusablePredicate.setLowKeyComparator(originalKeyCmp);
+    }
+
+    @Override
+    protected int getLowKeyIndex() throws HyracksDataException {
+        int index = frameTuple.findTupleIndex(pred.getLowKey(), pred.getLowKeyComparator(), lowKeyFtm, lowKeyFtp);
+        return index < 0 ? frame.getTupleCount() : index;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
index 33234944cb..e618aaa393 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/btree/ColumnBTreeRangeSearchCursor.java
@@ -24,6 +24,8 @@ import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 import org.apache.hyracks.storage.am.btree.impls.BTreeCursorInitialState;
 import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
 import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleMode;
+import org.apache.hyracks.storage.am.common.ophelpers.FindTupleNoExactMatchPolicy;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
 import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
 import org.apache.hyracks.storage.common.EnforcedIndexCursor;
@@ -53,6 +55,11 @@ public class ColumnBTreeRangeSearchCursor extends EnforcedIndexCursor
     protected RangePredicate pred;
     protected ITupleReference lowKey;
     protected ITupleReference highKey;
+
+    protected FindTupleMode lowKeyFtm;
+    protected FindTupleMode highKeyFtm;
+    protected FindTupleNoExactMatchPolicy lowKeyFtp;
+    protected FindTupleNoExactMatchPolicy highKeyFtp;
     protected boolean yieldFirstCall;
 
     protected final IIndexCursorStats stats;
@@ -67,7 +74,7 @@ public class ColumnBTreeRangeSearchCursor extends EnforcedIndexCursor
     }
 
     @Override
-    public void doDestroy() throws HyracksDataException {
+    public void doDestroy() {
         // No Op all resources are released in the close call
     }
 
@@ -84,7 +91,8 @@ public class ColumnBTreeRangeSearchCursor extends EnforcedIndexCursor
             bufferCache.unpin(page0);
             page0 = nextLeaf;
             frame.setPage(page0);
-            frameTuple.reset(0);
+            frameTuple.newPage();
+            setCursorPosition();
             nextLeafPage = frame.getNextLeaf();
         } while (frame.getTupleCount() == 0 && nextLeafPage > 0);
     }
@@ -120,20 +128,28 @@ public class ColumnBTreeRangeSearchCursor extends EnforcedIndexCursor
         pageId = ((BTreeCursorInitialState) initialState).getPageId();
         frame.setPage(page0);
         frame.setMultiComparator(originalKeyCmp);
-        frameTuple.reset(0);
+        frameTuple.newPage();
         initCursorPosition(searchPred);
     }
 
     protected void initCursorPosition(ISearchPredicate searchPred) throws HyracksDataException {
-        pred = (RangePredicate) searchPred;
-        lowKey = pred.getLowKey();
-        highKey = pred.getHighKey();
-
+        setSearchPredicate(searchPred);
         reusablePredicate.setLowKeyComparator(originalKeyCmp);
         reusablePredicate.setHighKeyComparator(pred.getHighKeyComparator());
         reusablePredicate.setHighKey(pred.getHighKey(), pred.isHighKeyInclusive());
         yieldFirstCall = false;
-        advanceTupleToLowKey();
+        setCursorPosition();
+    }
+
+    private void setCursorPosition() throws HyracksDataException {
+        int start = getLowKeyIndex();
+        int end = getHighKeyIndex();
+        if (end < start) {
+            frameTuple.consume();
+            return;
+        }
+        frameTuple.reset(start, end);
+        yieldFirstCall = shouldYieldFirstCall();
     }
 
     protected boolean isNextIncluded() throws HyracksDataException {
@@ -151,41 +167,6 @@ public class ColumnBTreeRangeSearchCursor extends EnforcedIndexCursor
         return highKey == null || isLessOrEqual(frameTuple, highKey, pred.isHighKeyInclusive());
     }
 
-    protected void advanceTupleToLowKey() throws HyracksDataException {
-        if (highKey != null && isLessOrEqual(highKey, frame.getLeftmostTuple(), !pred.isHighKeyInclusive())
-                || lowKey != null && isLessOrEqual(frame.getRightmostTuple(), lowKey, !pred.isLowKeyInclusive())) {
-            /*
-             * If
-             * - The Lowest key from the frame is greater than the requested highKey
-             * OR
-             * - The highest key from the frame is less than the requested lowKey
-             * Then:
-             * No tuple will satisfy the search key. Consume the frameTuple to stop the search.
-             */
-            frameTuple.consume();
-            return;
-        } else if (lowKey == null) {
-            // no lowKey was specified, start from tupleIndex = 0
-            return;
-        }
-
-        //The requested key is somewhere within the frame tuples
-        boolean stop = false;
-        int counter = 0;
-        while (!stop && !frameTuple.isConsumed()) {
-            frameTuple.next();
-            stop = isLessOrEqual(lowKey, frameTuple, pred.isLowKeyInclusive());
-            counter++;
-        }
-
-        // Only proceed if needed
-        yieldFirstCall = shouldYieldFirstCall();
-        // Advance all columns to the proper position if needed
-        if (yieldFirstCall && counter - 1 > 0) {
-            frameTuple.skip(counter - 1);
-        }
-    }
-
     protected boolean shouldYieldFirstCall() throws HyracksDataException {
         // Proceed if the highKey is null or the current tuple's key is less than (or equal) the highKey
         return highKey == null || isLessOrEqual(frameTuple, highKey, pred.isHighKeyInclusive());
@@ -205,6 +186,65 @@ public class ColumnBTreeRangeSearchCursor extends EnforcedIndexCursor
         return cmp < 0 || inclusive && cmp == 0;
     }
 
+    protected int getLowKeyIndex() throws HyracksDataException {
+        if (lowKey == null) {
+            return 0;
+        } else if (isLessOrEqual(frame.getRightmostTuple(), lowKey, !pred.isLowKeyInclusive())) {
+            //The highest key from the frame is less than the requested lowKey
+            return frame.getTupleCount();
+        }
+
+        int index = frameTuple.findTupleIndex(lowKey, pred.getLowKeyComparator(), lowKeyFtm, lowKeyFtp);
+        if (pred.isLowKeyInclusive()) {
+            index++;
+        } else {
+            if (index < 0) {
+                index = frame.getTupleCount();
+            }
+        }
+
+        return index;
+    }
+
+    protected int getHighKeyIndex() throws HyracksDataException {
+        if (highKey == null) {
+            return frame.getTupleCount() - 1;
+        } else if (isLessOrEqual(highKey, frame.getLeftmostTuple(), !pred.isHighKeyInclusive())) {
+            return -1;
+        }
+
+        int index = frameTuple.findTupleIndex(highKey, pred.getHighKeyComparator(), highKeyFtm, highKeyFtp);
+        if (pred.isHighKeyInclusive()) {
+            if (index < 0) {
+                index = frame.getTupleCount() - 1;
+            } else {
+                index--;
+            }
+        }
+
+        return index;
+    }
+
+    protected void setSearchPredicate(ISearchPredicate searchPred) {
+        pred = (RangePredicate) searchPred;
+        lowKey = pred.getLowKey();
+        highKey = pred.getHighKey();
+
+        lowKeyFtm = FindTupleMode.EXCLUSIVE;
+        if (pred.isLowKeyInclusive()) {
+            lowKeyFtp = FindTupleNoExactMatchPolicy.LOWER_KEY;
+        } else {
+            lowKeyFtp = FindTupleNoExactMatchPolicy.HIGHER_KEY;
+        }
+
+        highKeyFtm = FindTupleMode.EXCLUSIVE;
+        if (pred.isHighKeyInclusive()) {
+            highKeyFtp = FindTupleNoExactMatchPolicy.HIGHER_KEY;
+        } else {
+            highKeyFtp = FindTupleNoExactMatchPolicy.LOWER_KEY;
+        }
+    }
+
     @Override
     public void doClose() throws HyracksDataException {
         frameTuple.close();
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
index d0e1e1db4d..6f95dbfdb5 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/tuples/AbstractColumnTupleReference.java
@@ -42,6 +42,7 @@ public abstract class AbstractColumnTupleReference implements IColumnTupleIterat
     private final int numberOfPrimaryKeys;
     private int totalNumberOfMegaLeafNodes;
     private int numOfSkippedMegaLeafNodes;
+    private int endIndex;
     protected int tupleIndex;
 
     /**
@@ -78,47 +79,72 @@ public abstract class AbstractColumnTupleReference implements IColumnTupleIterat
     }
 
     @Override
-    public final void reset(int startIndex) throws HyracksDataException {
-        tupleIndex = startIndex;
+    public final void newPage() throws HyracksDataException {
+        tupleIndex = 0;
         ByteBuffer pageZero = frame.getBuffer();
         pageZero.clear();
         pageZero.position(HEADER_SIZE);
 
         int numberOfTuples = frame.getTupleCount();
-        //Start new page and check whether we should skip reading non-key columns or not
-        boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
 
         //Start primary keys
         for (int i = 0; i < numberOfPrimaryKeys; i++) {
             IColumnBufferProvider provider = primaryKeyBufferProviders[i];
             provider.reset(frame);
-            startPrimaryKey(provider, tupleIndex, i, numberOfTuples);
+            startPrimaryKey(provider, i, numberOfTuples);
         }
+    }
 
+    @Override
+    public final void reset(int startIndex, int endIndex) throws HyracksDataException {
+        tupleIndex = startIndex;
+        this.endIndex = endIndex;
+        ByteBuffer pageZero = frame.getBuffer();
+        int numberOfTuples = frame.getTupleCount();
+        //Start new page and check whether we should skip reading non-key columns or not
+        boolean readColumnPages = startNewPage(pageZero, frame.getNumberOfColumns(), numberOfTuples);
+        setPrimaryKeysAt(startIndex, startIndex);
         if (readColumnPages) {
             for (int i = 0; i < buffersProviders.length; i++) {
                 IColumnBufferProvider provider = buffersProviders[i];
                 //Release previous pinned pages if any
                 provider.releaseAll();
                 provider.reset(frame);
-                startColumn(provider, tupleIndex, i, numberOfTuples);
+                startColumn(provider, i, numberOfTuples);
             }
+            // Skip until before startIndex (i.e. stop at startIndex - 1)
+            skip(startIndex);
         } else {
             numOfSkippedMegaLeafNodes++;
         }
         totalNumberOfMegaLeafNodes++;
     }
 
+    @Override
+    public final void setAt(int startIndex) throws HyracksDataException {
+        int skipCount = startIndex - tupleIndex;
+        tupleIndex = startIndex;
+        setPrimaryKeysAt(startIndex, skipCount);
+        // -1 because next would be called for all columns
+        skip(skipCount - 1);
+    }
+
+    protected abstract void setPrimaryKeysAt(int index, int skipCount) throws HyracksDataException;
+
     protected abstract boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples);
 
-    protected abstract void startPrimaryKey(IColumnBufferProvider bufferProvider, int startIndex, int ordinal,
-            int numberOfTuples) throws HyracksDataException;
+    protected abstract void startPrimaryKey(IColumnBufferProvider bufferProvider, int ordinal, int numberOfTuples)
+            throws HyracksDataException;
 
-    protected abstract void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal,
-            int numberOfTuples) throws HyracksDataException;
+    protected abstract void startColumn(IColumnBufferProvider buffersProvider, int ordinal, int numberOfTuples)
+            throws HyracksDataException;
 
     protected abstract void onNext() throws HyracksDataException;
 
+    protected final int getTupleCount() {
+        return frame.getTupleCount();
+    }
+
     @Override
     public final void next() throws HyracksDataException {
         onNext();
@@ -132,7 +158,7 @@ public abstract class AbstractColumnTupleReference implements IColumnTupleIterat
 
     @Override
     public final boolean isConsumed() {
-        return tupleIndex >= frame.getTupleCount();
+        return tupleIndex >= endIndex;
     }
 
     @Override
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 8c8a550627..de4c4a91d6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -103,12 +103,6 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable extends BTreeSearchOpe
             cursor.next();
             matchingTupleCount++;
             ITupleReference tuple = cursor.getTuple();
-            if (tupleFilter != null) {
-                referenceFilterTuple.reset(tuple);
-                if (!tupleFilter.accept(referenceFilterTuple)) {
-                    continue;
-                }
-            }
             tb.reset();
 
             if (retainInput && retainMissing) {
@@ -124,7 +118,13 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable extends BTreeSearchOpe
                     tb.addFieldEndOffset();
                 }
             }
-            writeTupleToOutput(tuple);
+            ITupleReference projectedTuple = writeTupleToOutput(tuple);
+            if (tupleFilter != null) {
+                referenceFilterTuple.reset(projectedTuple);
+                if (!tupleFilter.accept(referenceFilterTuple)) {
+                    continue;
+                }
+            }
             FrameUtils.appendToWriter(writer, appender, tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
             if (outputLimit >= 0 && ++outputCount >= outputLimit) {
                 finished = true;