You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by wy...@apache.org on 2023/03/15 15:20:07 UTC

[asterixdb] branch master updated: [ASTERIXDB-3137][STO] Introduce LSM write operations for columnar format

This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 37e3f5cc0a [ASTERIXDB-3137][STO] Introduce LSM write operations for columnar format
37e3f5cc0a is described below

commit 37e3f5cc0a6a53bab348000481a981178cbe31cb
Author: Wail Alkowaileet <wa...@gmail.com>
AuthorDate: Tue Mar 14 17:33:12 2023 -0700

    [ASTERIXDB-3137][STO] Introduce LSM write operations for columnar format
    
    - user mode changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    This patch adds the support for writing columnar
    values to LSM indexes. By write we mean LSM flush,
    merge, and load operations
    
    Interface changes:
    ITupleProjector#project() now returns ITupleReference
    
    Change-Id: Ibe494d6de4478954df8e2f3ba0941391934954c2
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17424
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wail Alkowaileet <wa...@gmail.com>
    Reviewed-by: Murtadha Hubail <mh...@apache.org>
---
 .../assembler/AbstractNestedValueAssembler.java    | 111 ++++++++++++
 .../assembler/AbstractPrimitiveValueAssembler.java |  96 +++++++++++
 .../column/assembler/AbstractValueAssembler.java   | 110 ++++++++++++
 .../column/assembler/ArrayValueAssembler.java      |  75 ++++++++
 .../assembler/ArrayWithUnionValueAssembler.java    |  67 +++++++
 .../asterix/column/assembler/AssemblerInfo.java    | 100 +++++++++++
 .../asterix/column/assembler/EmptyAssembler.java   |  34 +++-
 .../column/assembler/ObjectValueAssembler.java     |  73 ++++++++
 .../column/assembler/PrimitiveValueAssembler.java  |  47 +++++
 .../assembler/RepeatedPrimitiveValueAssembler.java |  96 +++++++++++
 .../value/AbstractFixedLengthValueGetter.java      |  19 +-
 .../column/assembler/value/BooleanValueGetter.java |  21 ++-
 .../column/assembler/value/DoubleValueGetter.java  |  21 ++-
 .../column/assembler/value/IValueGetter.java       |  14 +-
 .../assembler/value/IValueGetterFactory.java       |  13 +-
 .../column/assembler/value/LongValueGetter.java    |  21 ++-
 .../column/assembler/value/MissingValueGetter.java |  27 ++-
 .../column/assembler/value/NullValueGetter.java    |  27 ++-
 .../column/assembler/value/StringValueGetter.java  |  31 ++--
 .../column/assembler/value/UUIDValueGetter.java    |  24 ++-
 .../column/assembler/value/ValueGetterFactory.java |  50 ++++++
 .../operation/lsm/flush/BatchFinalizerVisitor.java | 115 ++++++++++++
 .../operation/lsm/flush/ColumnTransformer.java     | 183 ++++++++++++++++++++
 .../flush/FlushColumnTupleReaderWriterFactory.java |  49 ++++++
 .../lsm/flush/FlushColumnTupleWithMetaWriter.java  |  49 ++++++
 .../lsm/flush/FlushColumnTupleWriter.java          | 121 +++++++++++++
 .../load/LoadColumnTupleReaderWriterFactory.java   |  37 ++++
 .../operation/lsm/load/LoadColumnTupleWriter.java  |  21 ++-
 .../operation/lsm/merge/IEndOfPageCallBack.java    |  24 ++-
 .../lsm/merge/MergeColumnReadMetadata.java         |  95 ++++++++++
 .../lsm/merge/MergeColumnTupleProjector.java       |  61 +++++++
 .../lsm/merge/MergeColumnTupleReader.java          |  40 +++++
 .../merge/MergeColumnTupleReaderWriterFactory.java |  45 +++++
 .../lsm/merge/MergeColumnTupleWriter.java          | 192 +++++++++++++++++++++
 .../lsm/merge/MergeColumnWriteMetadata.java        | 115 ++++++++++++
 .../tuple/AbstractAsterixColumnTupleReference.java | 140 +++++++++++++++
 .../column/tuple/MergeColumnTupleReference.java    | 100 +++++++++++
 .../am/common/impls/DefaultTupleProjector.java     |   3 +-
 .../storage/common/projection/ITupleProjector.java |   2 +-
 39 files changed, 2364 insertions(+), 105 deletions(-)

diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java
new file mode 100644
index 0000000000..1a4c3ef2e7
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractNestedValueAssembler.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+abstract class AbstractNestedValueAssembler extends AbstractValueAssembler {
+    protected final ArrayBackedValueStorage storage;
+
+    AbstractNestedValueAssembler(int level, AssemblerInfo info) {
+        super(level, info);
+        storage = new ArrayBackedValueStorage();
+    }
+
+    /**
+     * @return whether the nested assembler was started or not
+     */
+    final boolean isStarted() {
+        return started;
+    }
+
+    /**
+     * Add a nested value
+     *
+     * @param value contains the value and its information
+     */
+    abstract void addValue(AbstractValueAssembler value) throws HyracksDataException;
+
+    /**
+     * Add a nested {@link ATypeTag#NULL}
+     *
+     * @param value contains the value's information
+     */
+    abstract void addNull(AbstractValueAssembler value) throws HyracksDataException;
+
+    /**
+     * Add a nested {@link ATypeTag#MISSING}
+     */
+    void addMissing() throws HyracksDataException {
+        //By default, we ignore missing
+    }
+
+    @Override
+    final void addNullToAncestor(int nullLevel) throws HyracksDataException {
+        AbstractNestedValueAssembler parent = getParent();
+        if (nullLevel + 1 == level) {
+            parent.start();
+            parent.addNull(this);
+            return;
+        }
+        parent.addNullToAncestor(nullLevel);
+    }
+
+    @Override
+    final void addMissingToAncestor(int missingLevel) throws HyracksDataException {
+        AbstractNestedValueAssembler parent = getParent();
+        if (missingLevel + 1 == level) {
+            parent.start();
+            parent.addMissing();
+            return;
+        }
+        parent.addMissingToAncestor(missingLevel);
+    }
+
+    /**
+     * Recursively start the path of this assembler by staring all un-started parents
+     */
+    public final void start() {
+        if (started) {
+            return;
+        }
+        started = true;
+        reset();
+        AbstractNestedValueAssembler parent = getParent();
+        if (parent != null && !parent.isStarted()) {
+            parent.start();
+        }
+    }
+
+    /**
+     * End the assembler and add this nested value to its parent
+     */
+    public final void end() throws HyracksDataException {
+        if (started) {
+            addValueToParent();
+            started = false;
+        }
+
+        if (isDelegate()) {
+            getParent().end();
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
new file mode 100644
index 0000000000..9f1809dfe3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractPrimitiveValueAssembler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public abstract class AbstractPrimitiveValueAssembler extends AbstractValueAssembler {
+    /**
+     * An indicator to go to the next value
+     */
+    public static final int NEXT_ASSEMBLER = -1;
+    protected final IValueGetter primitiveValueGetter;
+    protected final IColumnValuesReader reader;
+
+    AbstractPrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader,
+            IValueGetter primitiveValueGetter) {
+        super(level, info);
+        this.primitiveValueGetter = primitiveValueGetter;
+        this.reader = reader;
+    }
+
+    public final void reset(AbstractBytesInputStream in, int startIndex, int numberOfTuples)
+            throws HyracksDataException {
+        reader.reset(in, numberOfTuples);
+        reader.skip(startIndex);
+    }
+
+    @Override
+    public final IValueReference getValue() throws HyracksDataException {
+        return primitiveValueGetter.getValue(reader);
+    }
+
+    @Override
+    void addNullToAncestor(int nullLevel) throws HyracksDataException {
+        AbstractNestedValueAssembler parent = getParent();
+        if (nullLevel + 1 == level) {
+            parent.start();
+            parent.addNull(this);
+            return;
+        }
+        parent.addNullToAncestor(nullLevel);
+    }
+
+    @Override
+    void addMissingToAncestor(int missingLevel) throws HyracksDataException {
+        AbstractNestedValueAssembler parent = getParent();
+        if (missingLevel + 1 == level) {
+            parent.start();
+            parent.addMissing();
+            return;
+        }
+        parent.addMissingToAncestor(missingLevel);
+    }
+
+    @Override
+    final void addValueToParent() throws HyracksDataException {
+        AbstractNestedValueAssembler parent = getParent();
+        parent.start();
+        getParent().addValue(this);
+    }
+
+    public final int getColumnIndex() {
+        return reader.getColumnIndex();
+    }
+
+    public final void skip(int count) throws HyracksDataException {
+        reader.skip(count);
+    }
+
+    /**
+     * Move to the next primitive value assembler
+     *
+     * @return the index of the next value
+     */
+    public abstract int next() throws HyracksDataException;
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java
new file mode 100644
index 0000000000..0071917074
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AbstractValueAssembler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public abstract class AbstractValueAssembler {
+    protected static final VoidPointable NULL;
+    protected static final VoidPointable MISSING;
+    private final AbstractNestedValueAssembler parent;
+    private final IValueReference fieldName;
+    private final int fieldIndex;
+    private final boolean delegate;
+    protected final int level;
+    protected boolean started;
+
+    static {
+        NULL = new VoidPointable();
+        NULL.set(new byte[] { ATypeTag.NULL.serialize() }, 0, 1);
+
+        MISSING = new VoidPointable();
+        MISSING.set(new byte[] { ATypeTag.MISSING.serialize() }, 0, 1);
+    }
+
+    protected AbstractValueAssembler(int level, AssemblerInfo info) {
+        this.parent = info.getParent();
+        this.fieldName = info.getFieldName();
+        this.fieldIndex = info.getFieldIndex();
+        this.delegate = info.isDelegate();
+        this.level = level;
+    }
+
+    /**
+     * Add {@link ATypeTag#NULL} value to the ancestor at {@code nullLevel}
+     *
+     * @param nullLevel at what level the null occurred
+     */
+    abstract void addNullToAncestor(int nullLevel) throws HyracksDataException;
+
+    /**
+     * Add {@link ATypeTag#MISSING} value to the ancestor at {@code missingLevel}
+     *
+     * @param missingLevel at what level the missing occurred
+     */
+    abstract void addMissingToAncestor(int missingLevel) throws HyracksDataException;
+
+    /**
+     * Add the value of this assembler to its parent
+     */
+    abstract void addValueToParent() throws HyracksDataException;
+
+    /**
+     * @return the assembled value
+     */
+    public abstract IValueReference getValue() throws HyracksDataException;
+
+    /**
+     * Reset assembler
+     */
+    void reset() {
+        //NoOp
+    }
+
+    /**
+     * @return whether this assembler is the delegate (or representative) of its siblings
+     */
+    final boolean isDelegate() {
+        return delegate;
+    }
+
+    /**
+     * @return parent of the assembler
+     */
+    final AbstractNestedValueAssembler getParent() {
+        return parent;
+    }
+
+    /**
+     * Return the field name of the value of this assembler
+     */
+    final IValueReference getFieldName() {
+        return fieldName;
+    }
+
+    /**
+     * Return the field index of the value of this assembler (for closed types)
+     */
+    final int getFieldIndex() {
+        return fieldIndex;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java
new file mode 100644
index 0000000000..2352e7fc96
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayValueAssembler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.builders.ListBuilderFactory;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class ArrayValueAssembler extends AbstractNestedValueAssembler {
+    private final IAsterixListBuilder listBuilder;
+    private final AbstractCollectionType collectionType;
+    private final int firstValueIndex;
+
+    ArrayValueAssembler(int level, AssemblerInfo info, int firstValueIndex) {
+        super(level, info);
+        this.firstValueIndex = firstValueIndex;
+        collectionType = (AbstractCollectionType) info.getDeclaredType();
+        listBuilder = new ListBuilderFactory().create(collectionType.getTypeTag());
+    }
+
+    final int getFirstValueIndex() {
+        return firstValueIndex;
+    }
+
+    @Override
+    void reset() {
+        listBuilder.reset(collectionType);
+        storage.reset();
+    }
+
+    @Override
+    void addValue(AbstractValueAssembler value) throws HyracksDataException {
+        listBuilder.addItem(value.getValue());
+    }
+
+    @Override
+    void addNull(AbstractValueAssembler value) throws HyracksDataException {
+        listBuilder.addItem(NULL);
+    }
+
+    @Override
+    void addMissing() throws HyracksDataException {
+        listBuilder.addItem(MISSING);
+    }
+
+    @Override
+    void addValueToParent() throws HyracksDataException {
+        storage.reset();
+        listBuilder.write(storage.getDataOutput(), true);
+        getParent().addValue(this);
+    }
+
+    @Override
+    public IValueReference getValue() {
+        return storage;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java
new file mode 100644
index 0000000000..dcd240bc78
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ArrayWithUnionValueAssembler.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ArrayWithUnionValueAssembler extends ArrayValueAssembler {
+    private final int numberOfUnionChildren;
+    private int numberOfAddedValues;
+    private boolean nonMissingValueAdded;
+
+    ArrayWithUnionValueAssembler(int level, AssemblerInfo info, int firstValueIndex, AbstractSchemaNode itemNode) {
+        super(level, info, firstValueIndex);
+        this.numberOfUnionChildren = ((UnionSchemaNode) itemNode).getChildren().size();
+    }
+
+    @Override
+    void reset() {
+        numberOfAddedValues = 0;
+        nonMissingValueAdded = false;
+        super.reset();
+    }
+
+    @Override
+    void addValue(AbstractValueAssembler value) throws HyracksDataException {
+        nonMissingValueAdded = true;
+        numberOfAddedValues++;
+        super.addValue(value);
+    }
+
+    @Override
+    void addNull(AbstractValueAssembler value) throws HyracksDataException {
+        nonMissingValueAdded = true;
+        numberOfAddedValues++;
+        super.addNull(value);
+    }
+
+    @Override
+    void addMissing() throws HyracksDataException {
+        numberOfAddedValues++;
+        if (nonMissingValueAdded && numberOfAddedValues >= numberOfUnionChildren) {
+            nonMissingValueAdded = false;
+            numberOfAddedValues = numberOfAddedValues % numberOfUnionChildren;
+        } else if (numberOfAddedValues == numberOfUnionChildren) {
+            super.addMissing();
+            numberOfAddedValues = 0;
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java
new file mode 100644
index 0000000000..712e65c842
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/AssemblerInfo.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+public class AssemblerInfo {
+    private final AbstractNestedValueAssembler parent;
+    private final IAType declaredType;
+    private final boolean delegate;
+    private final IValueReference fieldName;
+    private final int fieldIndex;
+
+    public AssemblerInfo() {
+        this(BuiltinType.ANY, null, false);
+    }
+
+    public AssemblerInfo(IAType declaredType, EmptyAssembler parent) {
+        this(declaredType, parent, false);
+    }
+
+    public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate) {
+        this(declaredType, parent, delegate, null, -1);
+    }
+
+    public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate,
+            IValueReference fieldName) {
+        this(declaredType, parent, delegate, fieldName, -1);
+    }
+
+    public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate, int fieldIndex) {
+        this(declaredType, parent, delegate, null, fieldIndex);
+    }
+
+    public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate,
+            IValueReference fieldName, int fieldIndex) {
+        this(declaredType, parent, delegate, fieldName, fieldIndex, false);
+    }
+
+    public AssemblerInfo(IAType declaredType, AbstractNestedValueAssembler parent, boolean delegate,
+            IValueReference fieldName, int fieldIndex, boolean fieldNameTagged) {
+        this.parent = parent;
+        this.declaredType = declaredType;
+        this.delegate = delegate;
+        this.fieldName = fieldNameTagged ? fieldName : createTaggedFieldName(fieldName);
+        this.fieldIndex = fieldIndex;
+    }
+
+    private IValueReference createTaggedFieldName(IValueReference fieldName) {
+        if (fieldName == null) {
+            return null;
+        }
+        byte[] storage = new byte[1 + fieldName.getLength()];
+        storage[0] = ATypeTag.STRING.serialize();
+        System.arraycopy(fieldName.getByteArray(), fieldName.getStartOffset(), storage, 1, fieldName.getLength());
+        VoidPointable taggedFieldName = new VoidPointable();
+        taggedFieldName.set(storage, 0, storage.length);
+        return taggedFieldName;
+    }
+
+    public AbstractNestedValueAssembler getParent() {
+        return parent;
+    }
+
+    public IAType getDeclaredType() {
+        return declaredType;
+    }
+
+    public boolean isDelegate() {
+        return delegate;
+    }
+
+    public IValueReference getFieldName() {
+        return fieldName;
+    }
+
+    public int getFieldIndex() {
+        return fieldIndex;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java
similarity index 52%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java
index 8ca1a82541..406a4016e4 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/EmptyAssembler.java
@@ -16,14 +16,34 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+public class EmptyAssembler extends AbstractNestedValueAssembler {
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    EmptyAssembler() {
+        super(-1, new AssemblerInfo());
+    }
+
+    @Override
+    void addValue(AbstractValueAssembler value) throws HyracksDataException {
+        //noOp
+    }
+
+    @Override
+    void addValueToParent() throws HyracksDataException {
+        //noOp
+    }
+
+    @Override
+    void addNull(AbstractValueAssembler value) throws HyracksDataException {
+        //noOp
+    }
+
+    @Override
+    public IValueReference getValue() throws HyracksDataException {
+        return null;
+    }
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java
new file mode 100644
index 0000000000..536ce02005
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/ObjectValueAssembler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.builders.RecordBuilder;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class ObjectValueAssembler extends AbstractNestedValueAssembler {
+    private final RecordBuilder recordBuilder;
+    private final ARecordType recordType;
+
+    ObjectValueAssembler(int level, AssemblerInfo info) {
+        super(level, info);
+        recordBuilder = new RecordBuilder();
+        recordType = (ARecordType) info.getDeclaredType();
+    }
+
+    @Override
+    void reset() {
+        recordBuilder.reset(recordType);
+        storage.reset();
+    }
+
+    @Override
+    void addValue(AbstractValueAssembler value) throws HyracksDataException {
+        int valueIndex = value.getFieldIndex();
+        if (valueIndex >= 0) {
+            recordBuilder.addField(valueIndex, value.getValue());
+        } else {
+            recordBuilder.addField(value.getFieldName(), value.getValue());
+        }
+    }
+
+    @Override
+    void addNull(AbstractValueAssembler value) throws HyracksDataException {
+        int valueIndex = value.getFieldIndex();
+        if (valueIndex >= 0) {
+            recordBuilder.addField(valueIndex, NULL);
+        } else {
+            recordBuilder.addField(value.getFieldName(), NULL);
+        }
+    }
+
+    @Override
+    void addValueToParent() throws HyracksDataException {
+        storage.reset();
+        recordBuilder.write(storage.getDataOutput(), true);
+        getParent().addValue(this);
+    }
+
+    @Override
+    public IValueReference getValue() {
+        return storage;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
new file mode 100644
index 0000000000..9592a12da9
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/PrimitiveValueAssembler.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class PrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
+
+    PrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader, IValueGetter primitiveValue) {
+        super(level, info, reader, primitiveValue);
+    }
+
+    @Override
+    public int next() throws HyracksDataException {
+        if (!reader.next()) {
+            throw new IllegalAccessError("no more values");
+        } else if (reader.isNull() && (isDelegate() || reader.getLevel() + 1 == level)) {
+            addNullToAncestor(reader.getLevel());
+        } else if (reader.isValue()) {
+            addValueToParent();
+        }
+
+        if (isDelegate()) {
+            getParent().end();
+        }
+        //Go to next value
+        return -1;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java
new file mode 100644
index 0000000000..8fa228f0df
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/RepeatedPrimitiveValueAssembler.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class RepeatedPrimitiveValueAssembler extends AbstractPrimitiveValueAssembler {
+    private final List<ArrayValueAssembler> arrays;
+
+    RepeatedPrimitiveValueAssembler(int level, AssemblerInfo info, IColumnValuesReader reader,
+            IValueGetter primitiveValue) {
+        super(level, info, reader, primitiveValue);
+        this.arrays = new ArrayList<>();
+    }
+
+    public void addArray(ArrayValueAssembler assembler) {
+        arrays.add(assembler);
+    }
+
+    @Override
+    public int next() throws HyracksDataException {
+        if (!reader.next()) {
+            throw new IllegalStateException("No more values");
+        } else if (reader.isNull() && (!arrays.isEmpty() || reader.getLevel() + 1 == level)) {
+            /*
+             * There are two cases here for where the null belongs to:
+             * 1- If the null is an array item, then add it
+             * 2- If the null is an ancestor, then we only add null if this column is the array delegate
+             * (i.e., !arrays.isEmpty())
+             */
+            addNullToAncestor(reader.getLevel());
+        } else if (reader.isMissing() && reader.getLevel() + 1 == level) {
+            /*
+             * Add a missing item
+             */
+            addMissingToAncestor(reader.getLevel());
+        } else if (reader.isValue()) {
+            addValueToParent();
+        }
+
+        if (isDelegate()) {
+            getParent().end();
+        }
+
+        //Initially, go to the next primitive assembler
+        int nextIndex = NEXT_ASSEMBLER;
+        if (!arrays.isEmpty()) {
+            /*
+             * This assembler is a delegate of a repeated group
+             * The delimiter index tells us that this assembler is responsible for a finished group
+             */
+            int delimiterIndex = reader.getDelimiterIndex();
+            if (delimiterIndex < arrays.size() && reader.isDelimiter()) {
+                //Also finish the next group
+                delimiterIndex++;
+            }
+
+            int numberOfFinishedGroups = Math.min(delimiterIndex, arrays.size());
+            for (int i = 0; i < numberOfFinishedGroups; i++) {
+                //I'm the delegate for this group of repeated values and the group(s) is finished
+                ArrayValueAssembler assembler = arrays.get(i);
+                assembler.end();
+            }
+
+            //Is the repeated group (determined by the delimiter index) still unfinished?
+            if (delimiterIndex < arrays.size()) {
+                //Yes, go to the first value of the unfinished repeated group
+                nextIndex = arrays.get(delimiterIndex).getFirstValueIndex();
+            }
+        }
+
+        //Go to next value
+        return nextIndex;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java
similarity index 58%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java
index 8ca1a82541..aeef686aca 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/AbstractFixedLengthValueGetter.java
@@ -16,14 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+public abstract class AbstractFixedLengthValueGetter implements IValueGetter {
+    protected final VoidPointable value;
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    AbstractFixedLengthValueGetter(ATypeTag typeTag, int nonTaggedLength) {
+        //+1 for the type tag
+        byte[] storage = new byte[1 + nonTaggedLength];
+        storage[0] = typeTag.serialize();
+        value = new VoidPointable();
+        value.set(storage, 0, storage.length);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java
similarity index 56%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java
index 8ca1a82541..4a776abfcb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/BooleanValueGetter.java
@@ -16,14 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.BooleanPointable;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+class BooleanValueGetter extends AbstractFixedLengthValueGetter {
+    BooleanValueGetter() {
+        super(ATypeTag.BOOLEAN, 1);
+    }
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    @Override
+    public IValueReference getValue(IColumnValuesReader reader) {
+        BooleanPointable.setBoolean(value.getByteArray(), value.getStartOffset() + 1, reader.getBoolean());
+        return value;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java
similarity index 56%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java
index 8ca1a82541..2e88896b7e 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/DoubleValueGetter.java
@@ -16,14 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+class DoubleValueGetter extends AbstractFixedLengthValueGetter {
+    DoubleValueGetter() {
+        super(ATypeTag.DOUBLE, Double.BYTES);
+    }
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    @Override
+    public IValueReference getValue(IColumnValuesReader reader) {
+        DoublePointable.setDouble(value.getByteArray(), value.getStartOffset() + 1, reader.getDouble());
+        return value;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java
similarity index 67%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java
index 8ca1a82541..9e58ab8748 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetter.java
@@ -16,14 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.data.std.api.IValueReference;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+@FunctionalInterface
+public interface IValueGetter {
+    IValueReference getValue(IColumnValuesReader reader);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java
similarity index 67%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java
index 8ca1a82541..0b58cfc4e6 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/IValueGetterFactory.java
@@ -16,14 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.om.types.ATypeTag;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+@FunctionalInterface
+public interface IValueGetterFactory {
+    IValueGetter createValueGetter(ATypeTag typeTag);
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
similarity index 56%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
index 8ca1a82541..e76e3c9564 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/LongValueGetter.java
@@ -16,14 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.LongPointable;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+class LongValueGetter extends AbstractFixedLengthValueGetter {
+    LongValueGetter() {
+        super(ATypeTag.BIGINT, Long.BYTES);
+    }
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    @Override
+    public IValueReference getValue(IColumnValuesReader reader) {
+        LongPointable.setLong(value.getByteArray(), value.getStartOffset() + 1, reader.getLong());
+        return value;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java
similarity index 52%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java
index 8ca1a82541..1ae84ee5c7 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/MissingValueGetter.java
@@ -16,14 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+public class MissingValueGetter implements IValueGetter {
+    public static final IValueGetter INSTANCE = new MissingValueGetter();
+    private static final VoidPointable MISSING;
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    static {
+        MISSING = new VoidPointable();
+        MISSING.set(new byte[] { ATypeTag.MISSING.serialize() }, 0, 1);
+    }
+
+    private MissingValueGetter() {
+    }
+
+    @Override
+    public IValueReference getValue(IColumnValuesReader reader) {
+        return MISSING;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java
similarity index 53%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java
index 8ca1a82541..e05025211f 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/NullValueGetter.java
@@ -16,14 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+public class NullValueGetter implements IValueGetter {
+    public static final IValueGetter INSTANCE = new NullValueGetter();
+    private static final VoidPointable NULL;
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    static {
+        NULL = new VoidPointable();
+        NULL.set(new byte[] { ATypeTag.NULL.serialize() }, 0, 1);
+    }
+
+    private NullValueGetter() {
+    }
+
+    @Override
+    public IValueReference getValue(IColumnValuesReader reader) {
+        return NULL;
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java
similarity index 50%
copy from hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java
index 00cb0c5486..1dd1aa7169 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/StringValueGetter.java
@@ -16,26 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.am.common.impls;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-import org.apache.hyracks.storage.common.projection.ITupleProjector;
+class StringValueGetter implements IValueGetter {
+    private final ArrayBackedValueStorage value;
 
-class DefaultTupleProjector implements ITupleProjector {
-    public static final ITupleProjector INSTANCE = new DefaultTupleProjector();
-
-    private DefaultTupleProjector() {
+    public StringValueGetter() {
+        value = new ArrayBackedValueStorage();
     }
 
     @Override
-    public void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
-        for (int i = 0; i < tuple.getFieldCount(); i++) {
-            dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
-            tb.addFieldEndOffset();
-        }
+    public IValueReference getValue(IColumnValuesReader reader) {
+        IValueReference string = reader.getBytes();
+        value.setSize(1 + string.getLength());
+        byte[] bytes = value.getByteArray();
+        bytes[0] = ATypeTag.STRING.serialize();
+        System.arraycopy(string.getByteArray(), string.getStartOffset(), bytes, 1, string.getLength());
+        return value;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java
similarity index 55%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java
index 8ca1a82541..135ed8571b 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/UUIDValueGetter.java
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.assembler.value;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.data.std.api.IValueReference;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+class UUIDValueGetter extends AbstractFixedLengthValueGetter {
+    UUIDValueGetter() {
+        super(ATypeTag.UUID, 16);
+    }
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
-}
+    @Override
+    public IValueReference getValue(IColumnValuesReader reader) {
+        IValueReference uuid = reader.getBytes();
+        System.arraycopy(uuid.getByteArray(), uuid.getStartOffset(), value.getByteArray(), value.getStartOffset() + 1,
+                uuid.getLength());
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java
new file mode 100644
index 0000000000..5f7fd7e096
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/assembler/value/ValueGetterFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.assembler.value;
+
+import org.apache.asterix.om.types.ATypeTag;
+
+public class ValueGetterFactory implements IValueGetterFactory {
+    public static final IValueGetterFactory INSTANCE = new ValueGetterFactory();
+
+    private ValueGetterFactory() {
+    }
+
+    @Override
+    public IValueGetter createValueGetter(ATypeTag typeTag) {
+        switch (typeTag) {
+            case NULL:
+                return NullValueGetter.INSTANCE;
+            case MISSING:
+                return MissingValueGetter.INSTANCE;
+            case BOOLEAN:
+                return new BooleanValueGetter();
+            case BIGINT:
+                return new LongValueGetter();
+            case DOUBLE:
+                return new DoubleValueGetter();
+            case STRING:
+                return new StringValueGetter();
+            case UUID:
+                return new UUIDValueGetter();
+            default:
+                throw new UnsupportedOperationException(typeTag + " is not supported");
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
new file mode 100644
index 0000000000..4cbe09bc9c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/BatchFinalizerVisitor.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ISchemaNodeVisitor;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.values.IColumnBatchWriter;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public final class BatchFinalizerVisitor implements ISchemaNodeVisitor<Void, AbstractSchemaNestedNode> {
+    private final FlushColumnMetadata columnSchemaMetadata;
+    private final IColumnValuesWriter[] primaryKeyWriters;
+    private final PriorityQueue<IColumnValuesWriter> orderedColumns;
+    private int level;
+
+    public BatchFinalizerVisitor(FlushColumnMetadata columnSchemaMetadata) {
+        this.columnSchemaMetadata = columnSchemaMetadata;
+        orderedColumns = new PriorityQueue<>(Comparator.comparingInt(x -> -x.getEstimatedSize()));
+        int numberOfPrimaryKeys = columnSchemaMetadata.getNumberOfPrimaryKeys();
+        primaryKeyWriters = new IColumnValuesWriter[numberOfPrimaryKeys];
+        for (int i = 0; i < numberOfPrimaryKeys; i++) {
+            primaryKeyWriters[i] = columnSchemaMetadata.getWriter(i);
+        }
+        level = -1;
+    }
+
+    public int finalizeBatch(IColumnBatchWriter batchWriter, FlushColumnMetadata columnMetadata)
+            throws HyracksDataException {
+        orderedColumns.clear();
+
+        columnMetadata.getRoot().accept(this, null);
+        if (columnMetadata.getMetaRoot() != null) {
+            columnMetadata.getMetaRoot().accept(this, null);
+        }
+
+        int allocatedSpace = batchWriter.writePrimaryKeyColumns(primaryKeyWriters);
+        allocatedSpace += batchWriter.writeColumns(orderedColumns);
+        return allocatedSpace;
+    }
+
+    @Override
+    public Void visit(ObjectSchemaNode objectNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
+        level++;
+        columnSchemaMetadata.flushDefinitionLevels(level, arg, objectNode);
+        List<AbstractSchemaNode> children = objectNode.getChildren();
+        for (int i = 0; i < children.size(); i++) {
+            children.get(i).accept(this, objectNode);
+        }
+        objectNode.setCounter(0);
+        columnSchemaMetadata.clearDefinitionLevels(objectNode);
+        level--;
+        return null;
+    }
+
+    @Override
+    public Void visit(AbstractCollectionSchemaNode collectionNode, AbstractSchemaNestedNode arg)
+            throws HyracksDataException {
+        level++;
+        columnSchemaMetadata.flushDefinitionLevels(level, arg, collectionNode);
+        collectionNode.getItemNode().accept(this, collectionNode);
+        collectionNode.setCounter(0);
+        columnSchemaMetadata.clearDefinitionLevels(collectionNode);
+        level--;
+        return null;
+    }
+
+    @Override
+    public Void visit(UnionSchemaNode unionNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
+        columnSchemaMetadata.flushDefinitionLevels(level, arg, unionNode);
+        for (AbstractSchemaNode node : unionNode.getChildren().values()) {
+            node.accept(this, unionNode);
+        }
+        unionNode.setCounter(0);
+        columnSchemaMetadata.clearDefinitionLevels(unionNode);
+        return null;
+    }
+
+    @Override
+    public Void visit(PrimitiveSchemaNode primitiveNode, AbstractSchemaNestedNode arg) throws HyracksDataException {
+        columnSchemaMetadata.flushDefinitionLevels(level, arg, primitiveNode);
+        if (!primitiveNode.isPrimaryKey()) {
+            orderedColumns.add(columnSchemaMetadata.getWriter(primitiveNode.getColumnIndex()));
+        }
+
+        //Prepare for the next batch
+        primitiveNode.setCounter(0);
+        return null;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
new file mode 100644
index 0000000000..48cd442c09
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/ColumnTransformer.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNestedNode;
+import org.apache.asterix.column.metadata.schema.AbstractSchemaNode;
+import org.apache.asterix.column.metadata.schema.ObjectSchemaNode;
+import org.apache.asterix.column.metadata.schema.UnionSchemaNode;
+import org.apache.asterix.column.metadata.schema.collection.AbstractCollectionSchemaNode;
+import org.apache.asterix.column.metadata.schema.primitive.PrimitiveSchemaNode;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.om.lazy.AbstractLazyVisitablePointable;
+import org.apache.asterix.om.lazy.AbstractListLazyVisitablePointable;
+import org.apache.asterix.om.lazy.FlatLazyVisitablePointable;
+import org.apache.asterix.om.lazy.ILazyVisitablePointableVisitor;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+
+public class ColumnTransformer implements ILazyVisitablePointableVisitor<AbstractSchemaNode, AbstractSchemaNode> {
+    private final FlushColumnMetadata columnMetadata;
+    private final VoidPointable nonTaggedValue;
+    private final ObjectSchemaNode root;
+    private AbstractSchemaNestedNode currentParent;
+    private int primaryKeysLength;
+
+    public ColumnTransformer(FlushColumnMetadata columnMetadata, ObjectSchemaNode root) {
+        this.columnMetadata = columnMetadata;
+        this.root = root;
+        nonTaggedValue = new VoidPointable();
+    }
+
+    /**
+     * Transform a tuple in row format into columns
+     *
+     * @param pointable record pointable
+     * @return the estimated size (possibly overestimated) of the primary key(s) columns
+     */
+    public int transform(RecordLazyVisitablePointable pointable) throws HyracksDataException {
+        primaryKeysLength = 0;
+        pointable.accept(this, root);
+        return primaryKeysLength;
+    }
+
+    public int writeAntiMatter(LSMBTreeTupleReference tuple) throws HyracksDataException {
+        int pkSize = 0;
+        for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+            byte[] bytes = tuple.getFieldData(i);
+            int start = tuple.getFieldStart(i);
+            ATypeTag tag = ATypeTag.VALUE_TYPE_MAPPING[bytes[start]];
+            nonTaggedValue.set(bytes, start + 1, tuple.getFieldLength(i) - 1);
+            IColumnValuesWriter writer = columnMetadata.getWriter(i);
+            writer.writeAntiMatter(tag, nonTaggedValue);
+            pkSize += writer.getEstimatedSize();
+        }
+        return pkSize;
+    }
+
+    @Override
+    public AbstractSchemaNode visit(RecordLazyVisitablePointable pointable, AbstractSchemaNode arg)
+            throws HyracksDataException {
+        columnMetadata.enterNode(currentParent, arg);
+        AbstractSchemaNestedNode previousParent = currentParent;
+
+        ObjectSchemaNode objectNode = (ObjectSchemaNode) arg;
+        currentParent = objectNode;
+        for (int i = 0; i < pointable.getNumberOfChildren(); i++) {
+            pointable.nextChild();
+            IValueReference fieldName = pointable.getFieldName();
+            ATypeTag childTypeTag = pointable.getChildTypeTag();
+            if (childTypeTag != ATypeTag.MISSING) {
+                //Only write actual field values (including NULL) but ignore MISSING fields
+                AbstractSchemaNode childNode = objectNode.getOrCreateChild(fieldName, childTypeTag, columnMetadata);
+                acceptActualNode(pointable.getChildVisitablePointable(), childNode);
+            }
+        }
+
+        columnMetadata.exitNode(arg);
+        currentParent = previousParent;
+        return null;
+    }
+
+    @Override
+    public AbstractSchemaNode visit(AbstractListLazyVisitablePointable pointable, AbstractSchemaNode arg)
+            throws HyracksDataException {
+        columnMetadata.enterNode(currentParent, arg);
+        AbstractSchemaNestedNode previousParent = currentParent;
+
+        AbstractCollectionSchemaNode collectionNode = (AbstractCollectionSchemaNode) arg;
+        RunLengthIntArray defLevels = columnMetadata.getDefinitionLevels(collectionNode);
+        //the level at which an item is missing
+        int missingLevel = columnMetadata.getLevel();
+        currentParent = collectionNode;
+
+        int numberOfChildren = pointable.getNumberOfChildren();
+        for (int i = 0; i < numberOfChildren; i++) {
+            pointable.nextChild();
+            ATypeTag childTypeTag = pointable.getChildTypeTag();
+            AbstractSchemaNode childNode = collectionNode.getOrCreateItem(childTypeTag, columnMetadata);
+            acceptActualNode(pointable.getChildVisitablePointable(), childNode);
+            /*
+             * The array item may change (e.g., BIGINT --> UNION). Thus, new items would be considered as missing
+             */
+            defLevels.add(missingLevel);
+        }
+
+        columnMetadata.exitCollectionNode(collectionNode, numberOfChildren);
+        currentParent = previousParent;
+        return null;
+    }
+
+    @Override
+    public AbstractSchemaNode visit(FlatLazyVisitablePointable pointable, AbstractSchemaNode arg)
+            throws HyracksDataException {
+        columnMetadata.enterNode(currentParent, arg);
+        ATypeTag valueTypeTag = pointable.getTypeTag();
+        PrimitiveSchemaNode node = (PrimitiveSchemaNode) arg;
+        IColumnValuesWriter writer = columnMetadata.getWriter(node.getColumnIndex());
+        if (valueTypeTag == ATypeTag.MISSING) {
+            writer.writeLevel(columnMetadata.getLevel());
+        } else if (valueTypeTag == ATypeTag.NULL) {
+            writer.writeNull(columnMetadata.getLevel());
+        } else if (pointable.isTagged()) {
+            //Remove type tag
+            nonTaggedValue.set(pointable.getByteArray(), pointable.getStartOffset() + 1, pointable.getLength() - 1);
+            writer.writeValue(pointable.getTypeTag(), nonTaggedValue);
+        } else {
+            writer.writeValue(pointable.getTypeTag(), pointable);
+        }
+        if (node.isPrimaryKey()) {
+            primaryKeysLength += writer.getEstimatedSize();
+        }
+        columnMetadata.exitNode(arg);
+        return null;
+    }
+
+    private void acceptActualNode(AbstractLazyVisitablePointable pointable, AbstractSchemaNode node)
+            throws HyracksDataException {
+        if (node.getTypeTag() == ATypeTag.UNION) {
+            columnMetadata.enterNode(currentParent, node);
+            AbstractSchemaNestedNode previousParent = currentParent;
+
+            UnionSchemaNode unionNode = (UnionSchemaNode) node;
+            currentParent = unionNode;
+
+            ATypeTag childTypeTag = pointable.getTypeTag();
+            AbstractSchemaNode actualNode;
+            if (childTypeTag == ATypeTag.NULL || childTypeTag == ATypeTag.MISSING) {
+                actualNode = unionNode.getOriginalType();
+            } else {
+                actualNode = unionNode.getOrCreateChild(pointable.getTypeTag(), columnMetadata);
+            }
+            pointable.accept(this, actualNode);
+
+            currentParent = previousParent;
+            columnMetadata.exitNode(node);
+        } else if (pointable.getTypeTag() == ATypeTag.NULL && node.isNested()) {
+            columnMetadata.addNestedNull(currentParent, (AbstractSchemaNestedNode) node);
+        } else {
+            pointable.accept(this, node);
+        }
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
new file mode 100644
index 0000000000..9b1b0a20e9
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleReaderWriterFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReaderWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+public class FlushColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
+    private static final long serialVersionUID = -9197679192729634493L;
+
+    public FlushColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) {
+        super(pageSize, maxNumberOfTuples, tolerance);
+    }
+
+    @Override
+    public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+        FlushColumnMetadata flushColumnMetadata = (FlushColumnMetadata) columnMetadata;
+        if (flushColumnMetadata.getMetaType() == null) {
+            //no meta
+            return new FlushColumnTupleWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance);
+        }
+        return new FlushColumnTupleWithMetaWriter(flushColumnMetadata, pageSize, maxNumberOfTuples, tolerance);
+    }
+
+    @Override
+    public AbstractColumnTupleReader createColumnReader(IColumnProjectionInfo columnProjectionInfo) {
+        return ((AbstractColumnImmutableReadMetadata) columnProjectionInfo).createTupleReader();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
new file mode 100644
index 0000000000..9c527daa0e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWithMetaWriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+
+public class FlushColumnTupleWithMetaWriter extends FlushColumnTupleWriter {
+    private final ColumnTransformer metaColumnTransformer;
+    private final RecordLazyVisitablePointable metaPointable;
+
+    public FlushColumnTupleWithMetaWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+            float tolerance) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+        metaColumnTransformer = new ColumnTransformer(columnMetadata, columnMetadata.getMetaRoot());
+        metaPointable = new TypedRecordLazyVisitablePointable(columnMetadata.getMetaType());
+    }
+
+    @Override
+    protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws HyracksDataException {
+        if (btreeTuple.isAntimatter()) {
+            return;
+        }
+
+        int metaFieldId = columnMetadata.getMetaRecordFieldIndex();
+        metaPointable.set(btreeTuple.getFieldData(metaFieldId), btreeTuple.getFieldStart(metaFieldId),
+                btreeTuple.getFieldLength(metaFieldId));
+        //In case the primary key is not in the meta part, we take the maximum
+        primaryKeysEstimatedSize = Math.max(metaColumnTransformer.transform(metaPointable), primaryKeysEstimatedSize);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
new file mode 100644
index 0000000000..1af043fb4e
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/flush/FlushColumnTupleWriter.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.flush;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.values.writer.ColumnBatchWriter;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.asterix.om.lazy.RecordLazyVisitablePointable;
+import org.apache.asterix.om.lazy.TypedRecordLazyVisitablePointable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.tuples.LSMBTreeTupleReference;
+
+public class FlushColumnTupleWriter extends AbstractColumnTupleWriter {
+    protected final FlushColumnMetadata columnMetadata;
+    protected final BatchFinalizerVisitor finalizer;
+    protected final ColumnBatchWriter writer;
+
+    private final ColumnTransformer transformer;
+    private final RecordLazyVisitablePointable pointable;
+    private final int maxNumberOfTuples;
+
+    protected int primaryKeysEstimatedSize;
+
+    public FlushColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+            float tolerance) {
+        this.columnMetadata = columnMetadata;
+        transformer = new ColumnTransformer(columnMetadata, columnMetadata.getRoot());
+        finalizer = new BatchFinalizerVisitor(columnMetadata);
+        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
+        this.maxNumberOfTuples = maxNumberOfTuples;
+        pointable = new TypedRecordLazyVisitablePointable(columnMetadata.getDatasetType());
+    }
+
+    @Override
+    public final void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+        columnMetadata.init(multiPageOp);
+    }
+
+    @Override
+    public final int getNumberOfColumns() {
+        return columnMetadata.getNumberOfColumns();
+    }
+
+    @Override
+    public final int bytesRequired(ITupleReference tuple) {
+        int primaryKeysSize = 0;
+        for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+            primaryKeysSize += tuple.getFieldLength(i);
+        }
+
+        //Mostly it is an overestimated size
+        return primaryKeysSize;
+    }
+
+    @Override
+    public final int getOccupiedSpace() {
+        int numberOfColumns = getNumberOfColumns();
+        int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE;
+        return primaryKeysEstimatedSize + filterSize;
+    }
+
+    @Override
+    public final int getMaxNumberOfTuples() {
+        return maxNumberOfTuples;
+    }
+
+    @Override
+    public final void close() {
+        columnMetadata.close();
+    }
+
+    @Override
+    public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+        //This from an in-memory component, hence the cast
+        LSMBTreeTupleReference btreeTuple = (LSMBTreeTupleReference) tuple;
+        if (btreeTuple.isAntimatter()) {
+            //Write only the primary keys of an anti-matter tuple
+            primaryKeysEstimatedSize = transformer.writeAntiMatter(btreeTuple);
+            return;
+        }
+        writeRecord(tuple);
+        writeMeta(btreeTuple);
+    }
+
+    @Override
+    public final int flush(ByteBuffer pageZero) throws HyracksDataException {
+        writer.setPageZeroBuffer(pageZero, getNumberOfColumns(), columnMetadata.getNumberOfPrimaryKeys());
+        return finalizer.finalizeBatch(writer, columnMetadata);
+    }
+
+    protected void writeRecord(ITupleReference tuple) throws HyracksDataException {
+        int recordFieldId = columnMetadata.getRecordFieldIndex();
+        pointable.set(tuple.getFieldData(recordFieldId), tuple.getFieldStart(recordFieldId),
+                tuple.getFieldLength(recordFieldId));
+        primaryKeysEstimatedSize = transformer.transform(pointable);
+    }
+
+    protected void writeMeta(LSMBTreeTupleReference btreeTuple) throws HyracksDataException {
+        //NoOp
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
new file mode 100644
index 0000000000..0c1990f479
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleReaderWriterFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.load;
+
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleReaderWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+
+public class LoadColumnTupleReaderWriterFactory extends FlushColumnTupleReaderWriterFactory {
+    private static final long serialVersionUID = -7583574057314353873L;
+
+    public LoadColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) {
+        super(pageSize, maxNumberOfTuples, tolerance);
+    }
+
+    @Override
+    public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+        return new LoadColumnTupleWriter((FlushColumnMetadata) columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
similarity index 56%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
index 8ca1a82541..e4604da849 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/load/LoadColumnTupleWriter.java
@@ -16,14 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.operation.lsm.load;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnTupleWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+public class LoadColumnTupleWriter extends FlushColumnTupleWriter {
+    public LoadColumnTupleWriter(FlushColumnMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+            float tolerance) {
+        super(columnMetadata, pageSize, maxNumberOfTuples, tolerance);
+    }
+
+    @Override
+    public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+        writeRecord(tuple);
+    }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java
similarity index 50%
copy from hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
copy to asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java
index 8ca1a82541..93df021bb1 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/IEndOfPageCallBack.java
@@ -16,14 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.hyracks.storage.common.projection;
+package org.apache.asterix.column.operation.lsm.merge;
 
-import java.io.DataOutput;
-import java.io.IOException;
+import org.apache.asterix.column.tuple.MergeColumnTupleReference;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeRangeSearchCursor;
 
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
-
-public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+/**
+ * An interface to signal {@link MergeColumnTupleWriter} that a component's page has reached the end.
+ */
+@FunctionalInterface
+public interface IEndOfPageCallBack {
+    /**
+     * Call {@link  MergeColumnTupleWriter} to finish the current "vertical" merging batch.
+     * The caller of this method is {@link MergeColumnTupleReference#lastTupleReached()}
+     *
+     * @see ColumnBTreeRangeSearchCursor#doHasNext()
+     */
+    void callEnd(MergeColumnTupleReference columnTuple) throws HyracksDataException;
 }
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
new file mode 100644
index 0000000000..11f3059504
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnReadMetadata.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+/**
+ * Merge column read metadata belongs to read an {@link ILSMDiskComponent}
+ * This only for reading an existing on-disk component for a merge operation. The schema here is immutable and cannot
+ * be changed.
+ */
+public final class MergeColumnReadMetadata extends AbstractColumnImmutableReadMetadata {
+    private final IColumnValuesReader[] columnReaders;
+
+    private MergeColumnReadMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+            IColumnValuesReader[] columnReaders, IValueReference serializedMetadata) {
+        super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnReaders.length);
+        this.columnReaders = columnReaders;
+    }
+
+    /**
+     * create ColumnMergeReadMetadata from columnMetadata
+     *
+     * @param serializedMetadata columnMetadata
+     * @return {@link MergeColumnReadMetadata}
+     * @see FlushColumnMetadata#serializeColumnsMetadata() for more information about serialization order
+     */
+    public static MergeColumnReadMetadata create(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+            IColumnValuesReaderFactory readerFactory, IValueReference serializedMetadata) throws IOException {
+        byte[] bytes = serializedMetadata.getByteArray();
+        int offset = serializedMetadata.getStartOffset();
+        int length = serializedMetadata.getLength();
+
+        int pathInfoStart = offset + IntegerPointable.getInteger(bytes, offset + PATH_INFO_POINTER);
+        DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, pathInfoStart, length));
+        int numberOfColumns = input.readInt();
+        IColumnValuesReader[] columnReaders = new IColumnValuesReader[numberOfColumns];
+        for (int i = 0; i < numberOfColumns; i++) {
+            IColumnValuesReader columnReader = readerFactory.createValueReader(input);
+            //The order at which the path info was written is not ordered by the column index
+            columnReaders[columnReader.getColumnIndex()] = columnReader;
+        }
+
+        return new MergeColumnReadMetadata(datasetType, metaType, numberOfPrimaryKeys, columnReaders,
+                serializedMetadata);
+    }
+
+    public IColumnValuesReader[] getColumnReaders() {
+        return columnReaders;
+    }
+
+    @Override
+    public int getColumnIndex(int ordinal) {
+        return ordinal;
+    }
+
+    @Override
+    public int getNumberOfProjectedColumns() {
+        return columnReaders.length;
+    }
+
+    @Override
+    public AbstractColumnTupleReader createTupleReader() {
+        return new MergeColumnTupleReader(this);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java
new file mode 100644
index 0000000000..f03506efa3
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleProjector.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.asterix.column.values.IColumnValuesReaderFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnTupleProjector;
+
+public class MergeColumnTupleProjector implements IColumnTupleProjector {
+    private final ARecordType datasetType;
+    private final ARecordType metaType;
+    private final int numberOfPrimaryKeys;
+    private final IColumnValuesReaderFactory readerFactory;
+
+    public MergeColumnTupleProjector(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+            IColumnValuesReaderFactory readerFactory) {
+        this.datasetType = datasetType;
+        this.metaType = metaType;
+        this.numberOfPrimaryKeys = numberOfPrimaryKeys;
+        this.readerFactory = readerFactory;
+    }
+
+    @Override
+    public IColumnProjectionInfo createProjectionInfo(IValueReference columnMetadata) throws HyracksDataException {
+        try {
+            return MergeColumnReadMetadata.create(datasetType, metaType, numberOfPrimaryKeys, readerFactory,
+                    columnMetadata);
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
+        throw new IllegalAccessError(getClass().getName());
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java
new file mode 100644
index 0000000000..4114f1076b
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.asterix.column.tuple.MergeColumnTupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+public class MergeColumnTupleReader extends AbstractColumnTupleReader {
+    private final MergeColumnReadMetadata columnMetadata;
+
+    public MergeColumnTupleReader(AbstractColumnImmutableReadMetadata columnMetadata) {
+        this.columnMetadata = (MergeColumnReadMetadata) columnMetadata;
+    }
+
+    @Override
+    public IColumnTupleIterator createTupleIterator(ColumnBTreeReadLeafFrame frame, int componentIndex,
+            IColumnReadMultiPageOp multiPageOp) {
+        return new MergeColumnTupleReference(componentIndex, frame, columnMetadata, multiPageOp);
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
new file mode 100644
index 0000000000..1ac94fe4cc
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleReaderWriterFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableReadMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReader;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleReaderWriterFactory;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnMetadata;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+
+public class MergeColumnTupleReaderWriterFactory extends AbstractColumnTupleReaderWriterFactory {
+    private static final long serialVersionUID = -2131401304338796428L;
+
+    public MergeColumnTupleReaderWriterFactory(int pageSize, int maxNumberOfTuples, float tolerance) {
+        super(pageSize, maxNumberOfTuples, tolerance);
+    }
+
+    @Override
+    public AbstractColumnTupleWriter createColumnWriter(IColumnMetadata columnMetadata) {
+        MergeColumnWriteMetadata mergeWriteMetadata = (MergeColumnWriteMetadata) columnMetadata;
+        return new MergeColumnTupleWriter(mergeWriteMetadata, pageSize, maxNumberOfTuples, tolerance);
+    }
+
+    @Override
+    public AbstractColumnTupleReader createColumnReader(IColumnProjectionInfo columnProjectionInfo) {
+        return ((AbstractColumnImmutableReadMetadata) columnProjectionInfo).createTupleReader();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
new file mode 100644
index 0000000000..fbda6d000c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnTupleWriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.column.tuple.MergeColumnTupleReference;
+import org.apache.asterix.column.util.RunLengthIntArray;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.writer.ColumnBatchWriter;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.AbstractColumnTupleWriter;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+
+public class MergeColumnTupleWriter extends AbstractColumnTupleWriter {
+    private final MergeColumnWriteMetadata columnMetadata;
+    private final MergeColumnTupleReference[] componentsTuples;
+    private final RunLengthIntArray writtenComponents;
+
+    private final IColumnValuesWriter[] primaryKeyWriters;
+    private final PriorityQueue<IColumnValuesWriter> orderedColumns;
+    private final ColumnBatchWriter writer;
+    private final int maxNumberOfTuples;
+    private int primaryKeysEstimatedSize;
+
+    public MergeColumnTupleWriter(MergeColumnWriteMetadata columnMetadata, int pageSize, int maxNumberOfTuples,
+            float tolerance) {
+        this.columnMetadata = columnMetadata;
+        List<IColumnTupleIterator> componentsTuplesList = columnMetadata.getComponentsTuples();
+        this.componentsTuples = new MergeColumnTupleReference[componentsTuplesList.size()];
+        for (int i = 0; i < componentsTuplesList.size(); i++) {
+            MergeColumnTupleReference mergeTuple = (MergeColumnTupleReference) componentsTuplesList.get(i);
+            this.componentsTuples[i] = mergeTuple;
+            mergeTuple.registerEndOfPageCallBack(this::writeAllColumns);
+        }
+        this.writtenComponents = new RunLengthIntArray();
+        this.maxNumberOfTuples = maxNumberOfTuples;
+        writer = new ColumnBatchWriter(columnMetadata.getMultiPageOpRef(), pageSize, tolerance);
+        writtenComponents.reset();
+
+        primaryKeyWriters = new IColumnValuesWriter[columnMetadata.getNumberOfPrimaryKeys()];
+        for (int i = 0; i < primaryKeyWriters.length; i++) {
+            primaryKeyWriters[i] = columnMetadata.getWriter(i);
+        }
+        orderedColumns = new PriorityQueue<>(Comparator.comparingInt(x -> -x.getEstimatedSize()));
+    }
+
+    @Override
+    public int bytesRequired(ITupleReference tuple) {
+        int primaryKeysSize = 0;
+        for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+            primaryKeysSize += tuple.getFieldLength(i);
+        }
+
+        return primaryKeysSize;
+    }
+
+    @Override
+    public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+        columnMetadata.init(multiPageOp);
+    }
+
+    @Override
+    public int getNumberOfColumns() {
+        return columnMetadata.getNumberOfColumns();
+    }
+
+    @Override
+    public int getMaxNumberOfTuples() {
+        return maxNumberOfTuples;
+    }
+
+    @Override
+    public int getOccupiedSpace() {
+        int numberOfColumns = getNumberOfColumns();
+        int filterSize = numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE;
+        return primaryKeysEstimatedSize + filterSize;
+    }
+
+    @Override
+    public void writeTuple(ITupleReference tuple) throws HyracksDataException {
+        MergeColumnTupleReference columnTuple = (MergeColumnTupleReference) tuple;
+        int componentIndex = columnTuple.getComponentIndex();
+        int skipCount = columnTuple.getAndResetSkipCount();
+        if (skipCount > 0) {
+            writtenComponents.add(-componentIndex, skipCount);
+        }
+        if (columnTuple.isAntimatter()) {
+            writtenComponents.add(-componentIndex);
+        } else {
+            writtenComponents.add(componentIndex);
+        }
+        writePrimaryKeys(columnTuple);
+    }
+
+    private void writePrimaryKeys(MergeColumnTupleReference columnTuple) throws HyracksDataException {
+        int primaryKeySize = 0;
+        for (int i = 0; i < columnMetadata.getNumberOfPrimaryKeys(); i++) {
+            IColumnValuesReader columnReader = columnTuple.getReader(i);
+            IColumnValuesWriter columnWriter = primaryKeyWriters[i];
+            columnReader.write(columnWriter, false);
+            primaryKeySize += columnWriter.getEstimatedSize();
+        }
+        primaryKeysEstimatedSize = primaryKeySize;
+    }
+
+    private void writeNonKeyColumns() throws HyracksDataException {
+        for (int i = 0; i < writtenComponents.getNumberOfBlocks(); i++) {
+            int componentIndex = writtenComponents.getBlockValue(i);
+            if (componentIndex < 0) {
+                //Skip writing values of deleted tuples
+                componentIndex = -componentIndex;
+                skipReaders(componentIndex, writtenComponents.getBlockSize(i));
+                continue;
+            }
+            MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
+            int count = writtenComponents.getBlockSize(i);
+            for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) {
+                IColumnValuesReader columnReader = componentTuple.getReader(j);
+                IColumnValuesWriter columnWriter = columnMetadata.getWriter(j);
+                columnReader.write(columnWriter, count);
+            }
+        }
+    }
+
+    private void skipReaders(int componentIndex, int count) throws HyracksDataException {
+        MergeColumnTupleReference componentTuple = componentsTuples[componentIndex];
+        for (int j = columnMetadata.getNumberOfPrimaryKeys(); j < columnMetadata.getNumberOfColumns(); j++) {
+            IColumnValuesReader columnReader = componentTuple.getReader(j);
+            columnReader.skip(count);
+        }
+    }
+
+    @Override
+    public int flush(ByteBuffer pageZero) throws HyracksDataException {
+        int numberOfColumns = columnMetadata.getNumberOfColumns();
+        int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
+        if (writtenComponents.getSize() > 0) {
+            writeNonKeyColumns();
+            writtenComponents.reset();
+        }
+        for (int i = numberOfPrimaryKeys; i < numberOfColumns; i++) {
+            orderedColumns.add(columnMetadata.getWriter(i));
+        }
+        writer.setPageZeroBuffer(pageZero, numberOfColumns, numberOfPrimaryKeys);
+        int allocatedSpace = writer.writePrimaryKeyColumns(primaryKeyWriters);
+        allocatedSpace += writer.writeColumns(orderedColumns);
+        return allocatedSpace;
+    }
+
+    @Override
+    public void close() {
+        columnMetadata.close();
+    }
+
+    private void writeAllColumns(MergeColumnTupleReference columnTuple) throws HyracksDataException {
+        /*
+         * The last tuple from one of the components was reached. Since we are going to the next leaf, we will not be
+         * able to access the readers of this component's leaf after this tuple. So, we are going to write
+         * the values of all columns as recorded in writtenComponents
+         */
+        int skipCount = columnTuple.getAndResetSkipCount();
+        if (skipCount > 0) {
+            writtenComponents.add(-columnTuple.getComponentIndex(), skipCount);
+        }
+        writeNonKeyColumns();
+        writtenComponents.reset();
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
new file mode 100644
index 0000000000..b0d1a01015
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/operation/lsm/merge/MergeColumnWriteMetadata.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.operation.lsm.merge;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.column.metadata.AbstractColumnImmutableMetadata;
+import org.apache.asterix.column.operation.lsm.flush.FlushColumnMetadata;
+import org.apache.asterix.column.values.IColumnValuesWriter;
+import org.apache.asterix.column.values.IColumnValuesWriterFactory;
+import org.apache.asterix.column.values.writer.ColumnValuesWriterFactory;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnWriteMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+
+/**
+ * Merge column write metadata belongs to write a new merge {@link ILSMDiskComponent}
+ * This is for writing a new on-disk component by merging two or more on disk components. The final schema for this
+ * component will the most recent schema, which belongs to the newest merged component. The schema here is immutable
+ * and cannot be changed.
+ */
+public final class MergeColumnWriteMetadata extends AbstractColumnImmutableMetadata {
+    private final Mutable<IColumnWriteMultiPageOp> multiPageOpRef;
+    private final List<IColumnValuesWriter> columnWriters;
+    private final List<IColumnTupleIterator> componentsTuples;
+
+    /**
+     * For LSM Merge
+     */
+    private MergeColumnWriteMetadata(ARecordType datasetType, ARecordType metaType, int numberOfPrimaryKeys,
+            Mutable<IColumnWriteMultiPageOp> multiPageOpRef, List<IColumnValuesWriter> columnWriters,
+            IValueReference serializedMetadata, List<IColumnTupleIterator> componentsTuples) {
+        super(datasetType, metaType, numberOfPrimaryKeys, serializedMetadata, columnWriters.size());
+        this.multiPageOpRef = multiPageOpRef;
+        this.columnWriters = columnWriters;
+        this.componentsTuples = componentsTuples;
+    }
+
+    /**
+     * Set {@link IColumnWriteMultiPageOp} for {@link IColumnValuesWriter}
+     *
+     * @param multiPageOp multi-buffer allocator
+     */
+    public void init(IColumnWriteMultiPageOp multiPageOp) throws HyracksDataException {
+        multiPageOpRef.setValue(multiPageOp);
+
+        //Reset writer for the first write
+        for (int i = 0; i < columnWriters.size(); i++) {
+            columnWriters.get(i).reset();
+        }
+    }
+
+    public Mutable<IColumnWriteMultiPageOp> getMultiPageOpRef() {
+        return multiPageOpRef;
+    }
+
+    public IColumnValuesWriter getWriter(int columnIndex) {
+        return columnWriters.get(columnIndex);
+    }
+
+    public void close() {
+        multiPageOpRef.setValue(null);
+        for (int i = 0; i < columnWriters.size(); i++) {
+            columnWriters.get(i).close();
+        }
+    }
+
+    public static MergeColumnWriteMetadata create(ARecordType datasetType, ARecordType metaType,
+            int numberOfPrimaryKeys, Mutable<IColumnWriteMultiPageOp> multiPageOpRef,
+            IValueReference serializedMetadata, List<IColumnTupleIterator> componentsTuples) throws IOException {
+        byte[] bytes = serializedMetadata.getByteArray();
+        int offset = serializedMetadata.getStartOffset();
+        int length = serializedMetadata.getLength();
+
+        int writersOffset = offset + IntegerPointable.getInteger(bytes, offset + WRITERS_POINTER);
+        DataInput input = new DataInputStream(new ByteArrayInputStream(bytes, writersOffset, length));
+
+        IColumnValuesWriterFactory writerFactory = new ColumnValuesWriterFactory(multiPageOpRef);
+        List<IColumnValuesWriter> writers = new ArrayList<>();
+        FlushColumnMetadata.deserializeWriters(input, writers, writerFactory);
+
+        return new MergeColumnWriteMetadata(datasetType, metaType, numberOfPrimaryKeys, multiPageOpRef, writers,
+                serializedMetadata, componentsTuples);
+    }
+
+    public List<IColumnTupleIterator> getComponentsTuples() {
+        return componentsTuples;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
new file mode 100644
index 0000000000..df6b554111
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/AbstractAsterixColumnTupleReference.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.tuple;
+
+import org.apache.asterix.column.assembler.value.IValueGetter;
+import org.apache.asterix.column.assembler.value.ValueGetterFactory;
+import org.apache.asterix.column.bytes.stream.in.AbstractBytesInputStream;
+import org.apache.asterix.column.bytes.stream.in.ByteBufferInputStream;
+import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnTupleIterator;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.lsm.tuples.AbstractColumnTupleReference;
+
+public abstract class AbstractAsterixColumnTupleReference extends AbstractColumnTupleReference {
+    private final IValueGetter[] primaryKeysValueGetters;
+    protected final ByteBufferInputStream[] primaryKeyStreams;
+    protected final IColumnValuesReader[] primaryKeyReaders;
+    protected final VoidPointable[] primaryKeys;
+    protected final AbstractBytesInputStream[] columnStreams;
+
+    protected AbstractAsterixColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
+            IColumnProjectionInfo info, IColumnReadMultiPageOp multiPageOp) {
+        super(componentIndex, frame, info, multiPageOp);
+        primaryKeyReaders = getPrimaryKeyReaders(info);
+        int numberOfPrimaryKeys = primaryKeyReaders.length;
+
+        this.primaryKeyStreams = new ByteBufferInputStream[numberOfPrimaryKeys];
+        primaryKeysValueGetters = new IValueGetter[numberOfPrimaryKeys];
+        primaryKeys = new VoidPointable[numberOfPrimaryKeys];
+
+        for (int i = 0; i < numberOfPrimaryKeys; i++) {
+            primaryKeyStreams[i] = new ByteBufferInputStream();
+            primaryKeysValueGetters[i] =
+                    ValueGetterFactory.INSTANCE.createValueGetter(primaryKeyReaders[i].getTypeTag());
+            primaryKeys[i] = new VoidPointable();
+        }
+
+        this.columnStreams = new AbstractBytesInputStream[info.getNumberOfProjectedColumns()];
+        for (int i = 0; i < columnStreams.length; i++) {
+            if (info.getColumnIndex(i) >= numberOfPrimaryKeys) {
+                columnStreams[i] = new MultiByteBufferInputStream();
+            } else {
+                columnStreams[i] = new ByteBufferInputStream();
+            }
+        }
+    }
+
+    protected abstract IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info);
+
+    @Override
+    protected final void startPrimaryKey(IColumnBufferProvider provider, int startIndex, int ordinal,
+            int numberOfTuples) throws HyracksDataException {
+        ByteBufferInputStream primaryKeyStream = primaryKeyStreams[ordinal];
+        primaryKeyStream.reset(provider);
+        IColumnValuesReader reader = primaryKeyReaders[ordinal];
+        reader.reset(primaryKeyStream, numberOfTuples);
+        reader.skip(startIndex);
+    }
+
+    @Override
+    protected final void onNext() throws HyracksDataException {
+        for (int i = 0; i < primaryKeys.length; i++) {
+            IColumnValuesReader reader = primaryKeyReaders[i];
+            reader.next();
+            primaryKeys[i].set(primaryKeysValueGetters[i].getValue(reader));
+        }
+    }
+
+    @Override
+    public void lastTupleReached() throws HyracksDataException {
+        //Default: noOp
+    }
+
+    @Override
+    public final int getFieldCount() {
+        return primaryKeys.length;
+    }
+
+    @Override
+    public final byte[] getFieldData(int fIdx) {
+        return primaryKeys[fIdx].getByteArray();
+    }
+
+    @Override
+    public final int getFieldStart(int fIdx) {
+        return primaryKeys[fIdx].getStartOffset();
+    }
+
+    @Override
+    public final int getFieldLength(int fIdx) {
+        return primaryKeys[fIdx].getLength();
+    }
+
+    @Override
+    public final int getTupleSize() {
+        return -1;
+    }
+
+    @Override
+    public final boolean isAntimatter() {
+        /*
+         * The primary key cannot be missing, but the actual tuple is missing. There is no need to check other
+         * primary key readers (for composite primary keys). One primary key reader is sufficient to determine if a
+         * tuple is an anti-matter tuple.
+         */
+        return primaryKeyReaders[0].isMissing();
+    }
+
+    @Override
+    public final int compareTo(IColumnTupleIterator o) {
+        AbstractAsterixColumnTupleReference other = (AbstractAsterixColumnTupleReference) o;
+        int compare = 0;
+        for (int i = 0; i < primaryKeys.length && compare == 0; i++) {
+            compare = primaryKeyReaders[i].compareTo(other.primaryKeyReaders[i]);
+        }
+        return compare;
+    }
+}
diff --git a/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
new file mode 100644
index 0000000000..c10d41550c
--- /dev/null
+++ b/asterixdb/asterix-column/src/main/java/org/apache/asterix/column/tuple/MergeColumnTupleReference.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.column.tuple;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.column.bytes.stream.in.MultiByteBufferInputStream;
+import org.apache.asterix.column.operation.lsm.merge.IEndOfPageCallBack;
+import org.apache.asterix.column.operation.lsm.merge.MergeColumnReadMetadata;
+import org.apache.asterix.column.values.IColumnValuesReader;
+import org.apache.asterix.column.values.writer.filters.AbstractColumnFilterWriter;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnBufferProvider;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.IColumnReadMultiPageOp;
+import org.apache.hyracks.storage.am.lsm.btree.column.api.projection.IColumnProjectionInfo;
+import org.apache.hyracks.storage.am.lsm.btree.column.impls.btree.ColumnBTreeReadLeafFrame;
+
+public final class MergeColumnTupleReference extends AbstractAsterixColumnTupleReference {
+    private final IColumnValuesReader[] columnReaders;
+    private int skipCount;
+    private IEndOfPageCallBack endOfPageCallBack;
+
+    public MergeColumnTupleReference(int componentIndex, ColumnBTreeReadLeafFrame frame,
+            MergeColumnReadMetadata columnMetadata, IColumnReadMultiPageOp multiPageOp) {
+        super(componentIndex, frame, columnMetadata, multiPageOp);
+        this.columnReaders = columnMetadata.getColumnReaders();
+    }
+
+    @Override
+    protected IColumnValuesReader[] getPrimaryKeyReaders(IColumnProjectionInfo info) {
+        MergeColumnReadMetadata columnMetadata = (MergeColumnReadMetadata) info;
+        int numberOfPrimaryKeys = columnMetadata.getNumberOfPrimaryKeys();
+        IColumnValuesReader[] primaryKeyReaders = new IColumnValuesReader[numberOfPrimaryKeys];
+        System.arraycopy(columnMetadata.getColumnReaders(), 0, primaryKeyReaders, 0, numberOfPrimaryKeys);
+        return primaryKeyReaders;
+    }
+
+    @Override
+    protected boolean startNewPage(ByteBuffer pageZero, int numberOfColumns, int numberOfTuples) {
+        //Skip filters
+        pageZero.position(pageZero.position() + numberOfColumns * AbstractColumnFilterWriter.FILTER_SIZE);
+        skipCount = 0;
+        return true;
+    }
+
+    @Override
+    protected void startColumn(IColumnBufferProvider buffersProvider, int startIndex, int ordinal, int numberOfTuples)
+            throws HyracksDataException {
+        int numberOfPrimaryKeys = primaryKeys.length;
+        if (ordinal < numberOfPrimaryKeys) {
+            //Skip primary key
+            return;
+        }
+        MultiByteBufferInputStream columnStream = (MultiByteBufferInputStream) columnStreams[ordinal];
+        columnStream.reset(buffersProvider);
+        IColumnValuesReader reader = columnReaders[ordinal];
+        reader.reset(columnStream, numberOfTuples);
+        reader.skip(startIndex);
+    }
+
+    @Override
+    public void skip(int count) throws HyracksDataException {
+        skipCount += count;
+    }
+
+    @Override
+    public void lastTupleReached() throws HyracksDataException {
+        endOfPageCallBack.callEnd(this);
+    }
+
+    public int getAndResetSkipCount() {
+        int currentSkipCount = skipCount;
+        skipCount = 0;
+        return currentSkipCount;
+    }
+
+    public IColumnValuesReader getReader(int columnIndex) {
+        return columnReaders[columnIndex];
+    }
+
+    public void registerEndOfPageCallBack(IEndOfPageCallBack endOfPageCallBack) {
+        this.endOfPageCallBack = endOfPageCallBack;
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
index 00cb0c5486..c63912bfa8 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/DefaultTupleProjector.java
@@ -32,10 +32,11 @@ class DefaultTupleProjector implements ITupleProjector {
     }
 
     @Override
-    public void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
+    public ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException {
         for (int i = 0; i < tuple.getFieldCount(); i++) {
             dos.write(tuple.getFieldData(i), tuple.getFieldStart(i), tuple.getFieldLength(i));
             tb.addFieldEndOffset();
         }
+        return tuple;
     }
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
index 8ca1a82541..ba23e307ef 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/projection/ITupleProjector.java
@@ -25,5 +25,5 @@ import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
 
 public interface ITupleProjector {
-    void project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
+    ITupleReference project(ITupleReference tuple, DataOutput dos, ArrayTupleBuilder tb) throws IOException;
 }