You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2019/02/27 21:33:13 UTC

[asterixdb] branch master updated: [ASTERIXDB-2516][RT] prepare physical comparators for deep comparison

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3001098  [ASTERIXDB-2516][RT] prepare physical comparators for deep comparison
3001098 is described below

commit 300109850876e4568748eb06971ad6abc4b42969
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Tue Feb 26 17:28:28 2019 -0800

    [ASTERIXDB-2516][RT] prepare physical comparators for deep comparison
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    This change is to make physical comparators type-aware in order to do
    deep comparison of complex types like arrays and records. The IAType
    is propagated to the comparators.
    - added new methods in IBinaryComparatorFactoryProvider to accept the
    type of left and right inputs for operations like hash join where
    the join key types come from different dataset sources.
    - defaulted some arrays functions to use the old comparator behaviour temporarily
    until complex comparison is implemented
    - modified AObjectAscBinaryComparatorFactory & AObjectDescBinaryComparatorFactory to
    create a comparator with IAType information. Changed the serialization/deserialization
    of their instances to take care of the newly added fields since they are not
    present in old instances.
    
    Change-Id: I02011e7151398d5f5f9ba9c1e1db6518484b9fe5
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3229
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Dmitry Lychagin <dm...@couchbase.com>
    Reviewed-by: Till Westmann <ti...@apache.org>
---
 .../comparators/AGenericAscBinaryComparator.java   | 319 +++++++++++++++++++
 .../comparators/AGenericDescBinaryComparator.java  |  47 +++
 .../AObjectAscBinaryComparatorFactory.java         | 336 +++------------------
 .../AObjectDescBinaryComparatorFactory.java        |  44 +--
 .../nontagged/BinaryComparatorFactoryProvider.java |  35 ++-
 .../org/apache/asterix/om/types/BuiltinType.java   |   2 +-
 .../std/RangeMapAggregateDescriptor.java           |  11 +-
 .../functions/AbstractArraySearchEval.java         |  17 +-
 .../functions/ArrayContainsDescriptor.java         |  17 +-
 .../functions/ArrayDistinctDescriptor.java         |   5 +-
 .../functions/ArrayIntersectDescriptor.java        |   5 +-
 .../functions/ArrayPositionDescriptor.java         |  16 +-
 .../evaluators/functions/ArrayPutDescriptor.java   |   5 +-
 .../functions/ArrayRemoveDescriptor.java           |   5 +-
 .../functions/ArrayReplaceDescriptor.java          |   5 +-
 .../evaluators/functions/ArraySortDescriptor.java  |   5 +-
 .../evaluators/functions/ArraySymDiffEval.java     |   5 +-
 .../evaluators/functions/ArrayUnionDescriptor.java |  10 +-
 .../physical/AbstractHashJoinPOperator.java        |   8 +-
 .../physical/HybridHashJoinPOperator.java          |  41 +--
 .../physical/InMemoryHashJoinPOperator.java        |  15 +-
 .../data/IBinaryComparatorFactoryProvider.java     |  48 ++-
 .../OptimizedHybridHashJoinOperatorDescriptor.java |  57 ++--
 .../integration/TPCHCustomerOrderHashJoinTest.java |   6 +
 .../apache/hyracks/examples/tpch/client/Join.java  |   1 +
 25 files changed, 624 insertions(+), 441 deletions(-)

diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AGenericAscBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AGenericAscBinaryComparator.java
new file mode 100644
index 0000000..1d20d72
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AGenericAscBinaryComparator.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators;
+
+import java.io.IOException;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
+import org.apache.asterix.om.types.hierachy.ITypeConvertComputer;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
+import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
+import org.apache.hyracks.data.std.primitive.BytePointable;
+import org.apache.hyracks.data.std.primitive.DoublePointable;
+import org.apache.hyracks.data.std.primitive.FloatPointable;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.primitive.ShortPointable;
+import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+
+class AGenericAscBinaryComparator implements IBinaryComparator {
+
+    // BOOLEAN
+    private final IBinaryComparator ascBoolComp = BooleanBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // TINYINT
+    private final IBinaryComparator ascByteComp =
+            new PointableBinaryComparatorFactory(BytePointable.FACTORY).createBinaryComparator();
+    // SMALLINT
+    private final IBinaryComparator ascShortComp =
+            new PointableBinaryComparatorFactory(ShortPointable.FACTORY).createBinaryComparator();
+    // INTEGER
+    private final IBinaryComparator ascIntComp =
+            new PointableBinaryComparatorFactory(IntegerPointable.FACTORY).createBinaryComparator();
+    // BIGINT
+    private final IBinaryComparator ascLongComp = LongBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // STRING
+    private final IBinaryComparator ascStrComp =
+            new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator();
+    // BINARY
+    private final IBinaryComparator ascByteArrayComp =
+            new PointableBinaryComparatorFactory(ByteArrayPointable.FACTORY).createBinaryComparator();
+    // FLOAT
+    private final IBinaryComparator ascFloatComp =
+            new PointableBinaryComparatorFactory(FloatPointable.FACTORY).createBinaryComparator();
+    // DOUBLE
+    private final IBinaryComparator ascDoubleComp =
+            new PointableBinaryComparatorFactory(DoublePointable.FACTORY).createBinaryComparator();
+    // RECTANGLE
+    private final IBinaryComparator ascRectangleComp =
+            ARectanglePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // CIRCLE
+    private final IBinaryComparator ascCircleComp =
+            ACirclePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // DURATION
+    private final IBinaryComparator ascDurationComp =
+            ADurationPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // INTERVAL
+    private final IBinaryComparator ascIntervalComp =
+            AIntervalAscPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // LINE
+    private final IBinaryComparator ascLineComp = ALinePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // POINT
+    private final IBinaryComparator ascPointComp =
+            APointPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // POINT3D
+    private final IBinaryComparator ascPoint3DComp =
+            APoint3DPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // POLYGON
+    private final IBinaryComparator ascPolygonComp =
+            APolygonPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // UUID
+    private final IBinaryComparator ascUUIDComp = AUUIDPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+    // RAW
+    private final IBinaryComparator rawComp = RawBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+
+    // a storage to promote a value
+    private final ArrayBackedValueStorage castBuffer;
+    private final IAType leftType;
+    private final IAType rightType;
+
+    AGenericAscBinaryComparator(IAType leftType, IAType rightType) {
+        this.leftType = leftType;
+        this.rightType = rightType;
+        this.castBuffer = new ArrayBackedValueStorage();
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) throws HyracksDataException {
+        // normally, comparing between MISSING and non-MISSING values should return MISSING as the result.
+        // however, this comparator is used by order-by/group-by/distinct-by.
+        // therefore, inside this method, we return an order between two values even if one value is MISSING.
+        if (b1[s1] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
+            return b2[s2] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG ? 0 : -1;
+        } else {
+            if (b2[s2] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
+                return 1;
+            }
+        }
+
+        // normally, comparing between NULL and non-NULL/MISSING values should return NULL as the result.
+        // however, this comparator is used by order-by/group-by/distinct-by.
+        // therefore, inside this method, we return an order between two values even if one value is NULL.
+        if (b1[s1] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+            return b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG ? 0 : -1;
+        } else {
+            if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
+                return 1;
+            }
+        }
+
+        ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
+        ATypeTag tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
+
+        // if one of tag is null, that means we are dealing with an empty byte array in one side.
+        // and, we don't need to continue. We just compare raw byte by byte.
+        if (tag1 == null || tag2 == null) {
+            return rawComp.compare(b1, s1, l1, b2, s2, l2);
+        }
+
+        // if two type does not match, we identify the source and the target and
+        // promote the source to the target type if they are compatible.
+        ATypeTag sourceTypeTag = null;
+        ATypeTag targetTypeTag = null;
+        boolean areTwoTagsEqual = false;
+        boolean typePromotionApplied = false;
+        boolean leftValueChanged = false;
+
+        if (tag1 != tag2) {
+            // tag1 can be promoted to tag2 (e.g. tag1: SMALLINT, tag2: INTEGER)
+            if (ATypeHierarchy.canPromote(tag1, tag2)) {
+                sourceTypeTag = tag1;
+                targetTypeTag = tag2;
+                typePromotionApplied = true;
+                leftValueChanged = true;
+                // or tag2 can be promoted to tag1 (e.g. tag2: INTEGER, tag1: DOUBLE)
+            } else if (ATypeHierarchy.canPromote(tag2, tag1)) {
+                sourceTypeTag = tag2;
+                targetTypeTag = tag1;
+                typePromotionApplied = true;
+            }
+
+            // we promote the source to the target by using a promoteComputer
+            if (typePromotionApplied) {
+                castBuffer.reset();
+                ITypeConvertComputer promoter = ATypeHierarchy.getTypePromoteComputer(sourceTypeTag, targetTypeTag);
+                if (promoter != null) {
+                    try {
+                        if (leftValueChanged) {
+                            // left side is the source
+                            promoter.convertType(b1, s1 + 1, l1 - 1, castBuffer.getDataOutput());
+                        } else {
+                            // right side is the source
+                            promoter.convertType(b2, s2 + 1, l2 - 1, castBuffer.getDataOutput());
+                        }
+                    } catch (IOException e) {
+                        throw new HyracksDataException("ComparatorFactory - failed to promote the type:" + sourceTypeTag
+                                + " to the type:" + targetTypeTag);
+                    }
+                } else {
+                    // No appropriate typePromoteComputer.
+                    throw new HyracksDataException("No appropriate typePromoteComputer exists for " + sourceTypeTag
+                            + " to the " + targetTypeTag + " type. Please check the code.");
+                }
+            }
+        } else {
+            // tag1 == tag2.
+            sourceTypeTag = tag1;
+            targetTypeTag = tag1;
+            areTwoTagsEqual = true;
+        }
+
+        // if two tags are not compatible, then we compare raw byte by byte, including the type tag.
+        // this is especially useful when we need to generate some order between any two types.
+        if ((!areTwoTagsEqual && !typePromotionApplied)) {
+            return rawComp.compare(b1, s1, l1, b2, s2, l2);
+        }
+
+        // conduct actual compare()
+        switch (targetTypeTag) {
+            case UUID:
+                return ascUUIDComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            case BOOLEAN: {
+                return ascBoolComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case TINYINT: {
+                // No type promotion from another type to the TINYINT can happen
+                return ascByteComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case SMALLINT: {
+                if (!typePromotionApplied) {
+                    // No type promotion case
+                    return ascShortComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                } else if (leftValueChanged) {
+                    // Type promotion happened. Left side was the source
+                    return ascShortComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
+                            castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
+                } else {
+                    // Type promotion happened. Right side was the source
+                    return ascShortComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
+                            castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
+                }
+            }
+            case TIME:
+            case DATE:
+            case YEARMONTHDURATION:
+            case INTEGER: {
+                if (!typePromotionApplied) {
+                    // No type promotion case
+                    return ascIntComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                } else if (leftValueChanged) {
+                    // Type promotion happened. Left side was the source
+                    return ascIntComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
+                            castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
+                } else {
+                    // Type promotion happened. Right side was the source
+                    return ascIntComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
+                            castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
+                }
+            }
+            case DATETIME:
+            case DAYTIMEDURATION:
+            case BIGINT: {
+                if (!typePromotionApplied) {
+                    // No type promotion case
+                    return ascLongComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                } else if (leftValueChanged) {
+                    // Type promotion happened. Left side was the source
+                    return ascLongComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
+                            castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
+                } else {
+                    // Type promotion happened. Right side was the source
+                    return ascLongComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
+                            castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
+                }
+            }
+            case FLOAT: {
+                if (!typePromotionApplied) {
+                    // No type promotion case
+                    return ascFloatComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                } else if (leftValueChanged) {
+                    // Type promotion happened. Left side was the source
+                    return ascFloatComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
+                            castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
+                } else {
+                    // Type promotion happened. Right side was the source
+                    return ascFloatComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
+                            castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
+                }
+            }
+            case DOUBLE: {
+                if (!typePromotionApplied) {
+                    // No type promotion case
+                    return ascDoubleComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+                } else if (leftValueChanged) {
+                    // Type promotion happened. Left side was the source
+                    return ascDoubleComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
+                            castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
+                } else {
+                    // Type promotion happened. Right side was the source
+                    return ascDoubleComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
+                            castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
+                }
+            }
+            case STRING: {
+                return ascStrComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case RECTANGLE: {
+                return ascRectangleComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case CIRCLE: {
+                return ascCircleComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case POINT: {
+                return ascPointComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case POINT3D: {
+                return ascPoint3DComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case LINE: {
+                return ascLineComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case POLYGON: {
+                return ascPolygonComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case DURATION: {
+                return ascDurationComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case INTERVAL: {
+                return ascIntervalComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            case BINARY: {
+                return ascByteArrayComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+            }
+            default: {
+                // we include typeTag in comparison to compare between two type to enforce some ordering
+                return rawComp.compare(b1, s1, l1, b2, s2, l2);
+            }
+        }
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AGenericDescBinaryComparator.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AGenericDescBinaryComparator.java
new file mode 100644
index 0000000..afe1349
--- /dev/null
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AGenericDescBinaryComparator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.dataflow.data.nontagged.comparators;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class AGenericDescBinaryComparator extends AGenericAscBinaryComparator {
+
+    // interval asc and desc comparators are not the inverse of each other.
+    // thus, we need to specify the interval desc comparator factory for descending comparisons.
+    private final IBinaryComparator descIntervalComp =
+            AIntervalDescPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+
+    AGenericDescBinaryComparator(IAType leftType, IAType rightType) {
+        super(leftType, rightType);
+    }
+
+    @Override
+    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) throws HyracksDataException {
+        ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
+        ATypeTag tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
+        if (tag1 == ATypeTag.INTERVAL && tag2 == ATypeTag.INTERVAL) {
+            return descIntervalComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
+        }
+        return -super.compare(b1, s1, l1, b2, s2, l2);
+    }
+}
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
index 99a26ca..06bdd2a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectAscBinaryComparatorFactory.java
@@ -18,323 +18,69 @@
  */
 package org.apache.asterix.dataflow.data.nontagged.comparators;
 
-import java.io.IOException;
-
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.om.types.hierachy.ITypeConvertComputer;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
-import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
-import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
-import org.apache.hyracks.data.std.primitive.BytePointable;
-import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.data.std.primitive.FloatPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.primitive.ShortPointable;
-import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
-import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 
 public class AObjectAscBinaryComparatorFactory implements IBinaryComparatorFactory {
 
     private static final long serialVersionUID = 1L;
+    // these fields can be null
+    private final IAType leftType;
+    private final IAType rightType;
+    private final boolean ascending;
 
-    public static final AObjectAscBinaryComparatorFactory INSTANCE = new AObjectAscBinaryComparatorFactory();
+    public AObjectAscBinaryComparatorFactory(IAType leftType, IAType rightType) {
+        this(leftType, rightType, true);
+    }
 
-    private AObjectAscBinaryComparatorFactory() {
+    protected AObjectAscBinaryComparatorFactory(IAType leftType, IAType rightType, boolean ascending) {
+        this.leftType = leftType;
+        this.rightType = rightType;
+        this.ascending = ascending;
     }
 
     @Override
     public IBinaryComparator createBinaryComparator() {
-        return new ABinaryComparator() {
-
-            // a storage to promote a value
-            private ArrayBackedValueStorage castBuffer = new ArrayBackedValueStorage();
-            private ITypeConvertComputer promoteComputer;
-
-            // BOOLEAN
-            final IBinaryComparator ascBoolComp = BooleanBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // TINYINT
-            final IBinaryComparator ascByteComp =
-                    new PointableBinaryComparatorFactory(BytePointable.FACTORY).createBinaryComparator();
-            // SMALLINT
-            final IBinaryComparator ascShortComp =
-                    new PointableBinaryComparatorFactory(ShortPointable.FACTORY).createBinaryComparator();
-            // INTEGER
-            final IBinaryComparator ascIntComp =
-                    new PointableBinaryComparatorFactory(IntegerPointable.FACTORY).createBinaryComparator();
-            // BIGINT
-            final IBinaryComparator ascLongComp = LongBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // STRING
-            final IBinaryComparator ascStrComp =
-                    new PointableBinaryComparatorFactory(UTF8StringPointable.FACTORY).createBinaryComparator();
-            // BINARY
-            final IBinaryComparator ascByteArrayComp =
-                    new PointableBinaryComparatorFactory(ByteArrayPointable.FACTORY).createBinaryComparator();
-            // FLOAT
-            final IBinaryComparator ascFloatComp =
-                    new PointableBinaryComparatorFactory(FloatPointable.FACTORY).createBinaryComparator();
-            // DOUBLE
-            final IBinaryComparator ascDoubleComp =
-                    new PointableBinaryComparatorFactory(DoublePointable.FACTORY).createBinaryComparator();
-            // RECTANGLE
-            final IBinaryComparator ascRectangleComp =
-                    ARectanglePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // CIRCLE
-            final IBinaryComparator ascCircleComp =
-                    ACirclePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // DURATION
-            final IBinaryComparator ascDurationComp =
-                    ADurationPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // INTERVAL
-            final IBinaryComparator ascIntervalComp =
-                    AIntervalAscPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // LINE
-            final IBinaryComparator ascLineComp = ALinePartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // POINT
-            final IBinaryComparator ascPointComp =
-                    APointPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // POINT3D
-            final IBinaryComparator ascPoint3DComp =
-                    APoint3DPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // POLYGON
-            final IBinaryComparator ascPolygonComp =
-                    APolygonPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // UUID
-            final IBinaryComparator ascUUIDComp = AUUIDPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            // RAW
-            final IBinaryComparator rawComp = RawBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-
-            @Override
-            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) throws HyracksDataException {
-
-                // Normally, comparing between MISSING and non-MISSING values should return MISSING as the result.
-                // However, this comparator is used by order-by/group-by/distinct-by.
-                // Therefore, inside this method, we return an order between two values even if one value is MISSING.
-                if (b1[s1] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
-                    return b2[s2] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG ? 0 : -1;
-                } else {
-                    if (b2[s2] == ATypeTag.SERIALIZED_MISSING_TYPE_TAG) {
-                        return 1;
-                    }
-                }
-
-                // Normally, comparing between NULL and non-NULL/MISSING values should return NULL as the result.
-                // However, this comparator is used by order-by/group-by/distinct-by.
-                // Therefore, inside this method, we return an order between two values even if one value is NULL.
-                if (b1[s1] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
-                    return b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG ? 0 : -1;
-                } else {
-                    if (b2[s2] == ATypeTag.SERIALIZED_NULL_TYPE_TAG) {
-                        return 1;
-                    }
-                }
-
-                ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
-                ATypeTag tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
-
-                // If one of tag is null, that means we are dealing with an empty byte array in one side.
-                // And, we don't need to continue. We just compare raw byte by byte.
-                if (tag1 == null || tag2 == null) {
-                    return rawComp.compare(b1, s1, l1, b2, s2, l2);
-                }
-
-                // If two type does not match, we identify the source and the target and
-                // promote the source to the target type if they are compatible.
-                ATypeTag sourceTypeTag = null;
-                ATypeTag targetTypeTag = null;
-                boolean areTwoTagsEqual = false;
-                boolean typePromotionApplied = false;
-                boolean leftValueChanged = false;
-
-                if (tag1 != tag2) {
-                    // tag1 can be promoted to tag2 (e.g. tag1: SMALLINT, tag2: INTEGER)
-                    if (ATypeHierarchy.canPromote(tag1, tag2)) {
-                        sourceTypeTag = tag1;
-                        targetTypeTag = tag2;
-                        typePromotionApplied = true;
-                        leftValueChanged = true;
-                        // or tag2 can be promoted to tag1 (e.g. tag2: INTEGER, tag1: DOUBLE)
-                    } else if (ATypeHierarchy.canPromote(tag2, tag1)) {
-                        sourceTypeTag = tag2;
-                        targetTypeTag = tag1;
-                        typePromotionApplied = true;
-                    }
-
-                    // we promote the source to the target by using a promoteComputer
-                    if (typePromotionApplied) {
-                        castBuffer.reset();
-                        promoteComputer = ATypeHierarchy.getTypePromoteComputer(sourceTypeTag, targetTypeTag);
-                        if (promoteComputer != null) {
-                            try {
-                                if (leftValueChanged) {
-                                    // left side is the source
-                                    promoteComputer.convertType(b1, s1 + 1, l1 - 1, castBuffer.getDataOutput());
-                                } else {
-                                    // right side is the source
-                                    promoteComputer.convertType(b2, s2 + 1, l2 - 1, castBuffer.getDataOutput());
-                                }
-                            } catch (IOException e) {
-                                throw new HyracksDataException("ComparatorFactory - failed to promote the type:"
-                                        + sourceTypeTag + " to the type:" + targetTypeTag);
-                            }
-                        } else {
-                            // No appropriate typePromoteComputer.
-                            throw new HyracksDataException("No appropriate typePromoteComputer exists for "
-                                    + sourceTypeTag + " to the " + targetTypeTag + " type. Please check the code.");
-                        }
-                    }
-                } else {
-                    // tag1 == tag2.
-                    sourceTypeTag = tag1;
-                    targetTypeTag = tag1;
-                    areTwoTagsEqual = true;
-                }
-
-                // If two tags are not compatible, then we compare raw byte by byte, including the type tag.
-                // This is especially useful when we need to generate some order between any two types.
-                if ((!areTwoTagsEqual && !typePromotionApplied)) {
-                    return rawComp.compare(b1, s1, l1, b2, s2, l2);
-                }
-
-                // Conduct actual compare()
-                switch (targetTypeTag) {
-                    case UUID:
-                        return ascUUIDComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    case BOOLEAN: {
-                        return ascBoolComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case TINYINT: {
-                        // No type promotion from another type to the TINYINT can happen
-                        return ascByteComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case SMALLINT: {
-                        if (!typePromotionApplied) {
-                            // No type promotion case
-                            return ascShortComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                        } else if (leftValueChanged) {
-                            // Type promotion happened. Left side was the source
-                            return ascShortComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                                    castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
-                        } else {
-                            // Type promotion happened. Right side was the source
-                            return ascShortComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
-                                    castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
-                        }
-                    }
-                    case TIME:
-                    case DATE:
-                    case YEARMONTHDURATION:
-                    case INTEGER: {
-                        if (!typePromotionApplied) {
-                            // No type promotion case
-                            return ascIntComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                        } else if (leftValueChanged) {
-                            // Type promotion happened. Left side was the source
-                            return ascIntComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                                    castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
-                        } else {
-                            // Type promotion happened. Right side was the source
-                            return ascIntComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
-                                    castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
-                        }
-                    }
-                    case DATETIME:
-                    case DAYTIMEDURATION:
-                    case BIGINT: {
-                        if (!typePromotionApplied) {
-                            // No type promotion case
-                            return ascLongComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                        } else if (leftValueChanged) {
-                            // Type promotion happened. Left side was the source
-                            return ascLongComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                                    castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
-                        } else {
-                            // Type promotion happened. Right side was the source
-                            return ascLongComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
-                                    castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
-                        }
-                    }
-                    case FLOAT: {
-                        if (!typePromotionApplied) {
-                            // No type promotion case
-                            return ascFloatComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                        } else if (leftValueChanged) {
-                            // Type promotion happened. Left side was the source
-                            return ascFloatComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                                    castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
-                        } else {
-                            // Type promotion happened. Right side was the source
-                            return ascFloatComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
-                                    castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
-                        }
-                    }
-                    case DOUBLE: {
-                        if (!typePromotionApplied) {
-                            // No type promotion case
-                            return ascDoubleComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                        } else if (leftValueChanged) {
-                            // Type promotion happened. Left side was the source
-                            return ascDoubleComp.compare(castBuffer.getByteArray(), castBuffer.getStartOffset() + 1,
-                                    castBuffer.getLength() - 1, b2, s2 + 1, l2 - 1);
-                        } else {
-                            // Type promotion happened. Right side was the source
-                            return ascDoubleComp.compare(b1, s1 + 1, l1 - 1, castBuffer.getByteArray(),
-                                    castBuffer.getStartOffset() + 1, castBuffer.getLength() - 1);
-                        }
-                    }
-                    case STRING: {
-                        return ascStrComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case RECTANGLE: {
-                        return ascRectangleComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case CIRCLE: {
-                        return ascCircleComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case POINT: {
-                        return ascPointComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case POINT3D: {
-                        return ascPoint3DComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case LINE: {
-                        return ascLineComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case POLYGON: {
-                        return ascPolygonComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case DURATION: {
-                        return ascDurationComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case INTERVAL: {
-                        return ascIntervalComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    case BINARY: {
-                        return ascByteArrayComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                    }
-                    default: {
-                        // We include typeTag in comparison to compare between two type to enforce some ordering
-                        return rawComp.compare(b1, s1, l1, b2, s2, l2);
-                    }
-                }
-            }
-        };
+        return ascending ? new AGenericAscBinaryComparator(leftType, rightType)
+                : new AGenericDescBinaryComparator(leftType, rightType);
     }
 
     @Override
     public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
-        return registry.getClassIdentifier(getClass(), serialVersionUID);
+        return convertToJson(registry, getClass(), serialVersionUID);
+    }
+
+    JsonNode convertToJson(IPersistedResourceRegistry registry, Class<? extends IJsonSerializable> clazz, long version)
+            throws HyracksDataException {
+        ObjectNode jsonNode = registry.getClassIdentifier(clazz, version);
+        if (leftType != null) {
+            jsonNode.set("leftType", leftType.toJson(registry));
+        }
+        if (rightType != null) {
+            jsonNode.set("rightType", rightType.toJson(registry));
+        }
+        return jsonNode;
+    }
+
+    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
+            throws HyracksDataException {
+        return convertToObject(registry, json, true);
     }
 
-    @SuppressWarnings("squid:S1172") // unused parameter
-    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
-        return INSTANCE;
+    static IJsonSerializable convertToObject(IPersistedResourceRegistry registry, JsonNode json, boolean asc)
+            throws HyracksDataException {
+        JsonNode leftNode = json.get("leftType");
+        JsonNode rightNode = json.get("rightType");
+        IAType leftType = leftNode == null || leftNode.isNull() ? null : (IAType) registry.deserialize(leftNode);
+        IAType rightType = rightNode == null || rightNode.isNull() ? null : (IAType) registry.deserialize(rightNode);
+        return asc ? new AObjectAscBinaryComparatorFactory(leftType, rightType)
+                : new AObjectDescBinaryComparatorFactory(leftType, rightType);
     }
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescBinaryComparatorFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescBinaryComparatorFactory.java
index 5de502c..2fc866a 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescBinaryComparatorFactory.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/comparators/AObjectDescBinaryComparatorFactory.java
@@ -18,56 +18,28 @@
  */
 package org.apache.asterix.dataflow.data.nontagged.comparators;
 
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IJsonSerializable;
 import org.apache.hyracks.api.io.IPersistedResourceRegistry;
 
 import com.fasterxml.jackson.databind.JsonNode;
 
-public class AObjectDescBinaryComparatorFactory implements IBinaryComparatorFactory {
+public class AObjectDescBinaryComparatorFactory extends AObjectAscBinaryComparatorFactory {
 
     private static final long serialVersionUID = 1L;
 
-    public static final IBinaryComparatorFactory INSTANCE = new AObjectDescBinaryComparatorFactory();
-
-    private AObjectDescBinaryComparatorFactory() {
-    }
-
-    @Override
-    public IBinaryComparator createBinaryComparator() {
-        return new ABinaryComparator() {
-            final IBinaryComparator ascComp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-
-            // INTERVAL
-            // Interval asc and desc comparator factories are not the inverse of each other.
-            // Thus, we need to specify the interval desc comparator factory for descending comparisons.
-            final IBinaryComparator descIntervalComp =
-                    AIntervalDescPartialBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-
-            @Override
-            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) throws HyracksDataException {
-                ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b1[s1]);
-                ATypeTag tag2 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b2[s2]);
-                if (tag1 == ATypeTag.INTERVAL && tag2 == ATypeTag.INTERVAL) {
-                    return descIntervalComp.compare(b1, s1 + 1, l1 - 1, b2, s2 + 1, l2 - 1);
-                }
-                return -ascComp.compare(b1, s1, l1, b2, s2, l2);
-            }
-        };
+    public AObjectDescBinaryComparatorFactory(IAType leftType, IAType rightType) {
+        super(leftType, rightType, false);
     }
 
     @Override
     public JsonNode toJson(IPersistedResourceRegistry registry) throws HyracksDataException {
-        return registry.getClassIdentifier(getClass(), serialVersionUID);
+        return convertToJson(registry, getClass(), serialVersionUID);
     }
 
-    @SuppressWarnings("squid:S1172") // unused parameter
-    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json) {
-        return INSTANCE;
+    public static IJsonSerializable fromJson(IPersistedResourceRegistry registry, JsonNode json)
+            throws HyracksDataException {
+        return convertToObject(registry, json, false);
     }
-
 }
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryComparatorFactoryProvider.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryComparatorFactoryProvider.java
index 297b4c6..22cb6ee 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryComparatorFactoryProvider.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/BinaryComparatorFactoryProvider.java
@@ -86,21 +86,33 @@ public class BinaryComparatorFactoryProvider implements IBinaryComparatorFactory
     // TODO: We should incorporate this option more nicely, but I'd have to change algebricks.
     @Override
     public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending, boolean ignoreCase) {
-        if (type == null) {
-            return anyBinaryComparatorFactory(ascending);
+        return getBinaryComparatorFactory(type, type, ascending, ignoreCase);
+    }
+
+    @Override
+    public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) {
+        return getBinaryComparatorFactory(type, type, ascending);
+    }
+
+    @Override
+    public IBinaryComparatorFactory getBinaryComparatorFactory(Object leftType, Object rightType, boolean ascending,
+            boolean ignoreCase) {
+        if (leftType == null || rightType == null) {
+            return createGenericBinaryComparatorFactory(null, null, ascending);
         }
-        IAType aqlType = (IAType) type;
-        if (aqlType.getTypeTag() == ATypeTag.STRING && ignoreCase) {
+        IAType left = (IAType) leftType;
+        IAType right = (IAType) rightType;
+        if (left.getTypeTag() == ATypeTag.STRING && right.getTypeTag() == ATypeTag.STRING && ignoreCase) {
             return addOffset(UTF8STRING_LOWERCASE_POINTABLE_INSTANCE, ascending);
         }
-        return getBinaryComparatorFactory(type, ascending);
+        return createGenericBinaryComparatorFactory(left, right, ascending);
     }
 
     @Override
-    public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) {
+    public IBinaryComparatorFactory getBinaryComparatorFactory(Object leftType, Object rightType, boolean ascending) {
         // During a comparison, since proper type promotion among several numeric types are required,
         // we will use AObjectAscBinaryComparatorFactory, instead of using a specific comparator
-        return anyBinaryComparatorFactory(ascending);
+        return createGenericBinaryComparatorFactory((IAType) leftType, (IAType) rightType, ascending);
     }
 
     public IBinaryComparatorFactory getBinaryComparatorFactory(ATypeTag type, boolean ascending) {
@@ -108,7 +120,7 @@ public class BinaryComparatorFactoryProvider implements IBinaryComparatorFactory
             case ANY:
             case UNION:
                 // we could do smth better for nullable fields
-                return anyBinaryComparatorFactory(ascending);
+                return createGenericBinaryComparatorFactory(null, null, ascending);
             case NULL:
             case MISSING:
                 return new AnyBinaryComparatorFactory();
@@ -162,11 +174,12 @@ public class BinaryComparatorFactoryProvider implements IBinaryComparatorFactory
         return new OrderedBinaryComparatorFactory(inst, ascending);
     }
 
-    private IBinaryComparatorFactory anyBinaryComparatorFactory(boolean ascending) {
+    private IBinaryComparatorFactory createGenericBinaryComparatorFactory(IAType leftType, IAType rightType,
+            boolean ascending) {
         if (ascending) {
-            return AObjectAscBinaryComparatorFactory.INSTANCE;
+            return new AObjectAscBinaryComparatorFactory(leftType, rightType);
         } else {
-            return AObjectDescBinaryComparatorFactory.INSTANCE;
+            return new AObjectDescBinaryComparatorFactory(leftType, rightType);
         }
     }
 
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/BuiltinType.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/BuiltinType.java
index 4fc146f..76ba6a2 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/BuiltinType.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/types/BuiltinType.java
@@ -1153,7 +1153,7 @@ public abstract class BuiltinType implements IAType {
             case SHORTWITHOUTTYPEINFO:
                 return SHORTWITHOUTTYPEINFO;
             default:
-                throw new IllegalStateException();
+                throw new UnsupportedOperationException(typeTag.toString());
         }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index d33a6f7..bf8cf49 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -25,8 +25,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectDescBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
@@ -266,11 +265,9 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
             // create the generic comparator for each sort field
             IBinaryComparator[] fieldsComparators = new IBinaryComparator[ascending.length];
             for (int i = 0; i < ascending.length; i++) {
-                if (ascending[i]) {
-                    fieldsComparators[i] = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-                } else {
-                    fieldsComparators[i] = AObjectDescBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-                }
+                // TODO(ali): this is temporary
+                fieldsComparators[i] = BinaryComparatorFactoryProvider.INSTANCE
+                        .getBinaryComparatorFactory(null, null, ascending[i]).createBinaryComparator();
             }
 
             return (splitPoint1, splitPoint2) -> {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java
index 8b5ba72..b761c0d 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java
@@ -24,8 +24,9 @@ import java.io.IOException;
 
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.base.AMutableInt32;
+import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.evaluators.common.ListAccessor;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
@@ -39,6 +40,7 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractArraySearchEval implements IScalarEvaluator {
+    private IAType[] argTypes;
     private final IPointable listArg;
     private final IPointable searchedValueArg;
     private final IPointable tempVal;
@@ -50,20 +52,27 @@ public abstract class AbstractArraySearchEval implements IScalarEvaluator {
     private final AMutableInt32 intValue;
     protected final ArrayBackedValueStorage storage;
 
-    public AbstractArraySearchEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLoc)
-            throws HyracksDataException {
+    public AbstractArraySearchEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLoc,
+            IAType[] argTypes) throws HyracksDataException {
+        this.argTypes = argTypes;
         storage = new ArrayBackedValueStorage();
         listArg = new VoidPointable();
         searchedValueArg = new VoidPointable();
         tempVal = new VoidPointable();
         listEval = args[0].createScalarEvaluator(ctx);
         searchedValueEval = args[1].createScalarEvaluator(ctx);
-        comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        comp = createComparator();
         listAccessor = new ListAccessor();
         intValue = new AMutableInt32(-1);
         sourceLocation = sourceLoc;
     }
 
+    private IBinaryComparator createComparator() {
+        // TODO(ali): using old comparator behaviour for now. Should compute proper types based on args
+        return BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                .createBinaryComparator();
+    }
+
     @Override
     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
         // 1st arg: list
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java
index 5443834..16d07c3 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayContainsDescriptor.java
@@ -24,8 +24,11 @@ import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
 import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -53,12 +56,18 @@ import org.apache.hyracks.data.std.api.IPointable;
  */
 public class ArrayContainsDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
+    private IAType[] argTypes;
 
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new ArrayContainsDescriptor();
         }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.SET_ARGUMENTS_TYPE;
+        }
     };
 
     @Override
@@ -67,6 +76,11 @@ public class ArrayContainsDescriptor extends AbstractScalarFunctionDynamicDescri
     }
 
     @Override
+    public void setImmutableStates(Object... states) {
+        argTypes = (IAType[]) states;
+    }
+
+    @Override
     public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
             throws AlgebricksException {
         return new IScalarEvaluatorFactory() {
@@ -83,8 +97,7 @@ public class ArrayContainsDescriptor extends AbstractScalarFunctionDynamicDescri
         private final ISerializerDeserializer booleanSerde;
 
         public ArrayContainsEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
-            super(args, ctx, sourceLoc);
-            // TODO(ali): should we get the nontagged serde?
+            super(args, ctx, sourceLoc, argTypes);
             booleanSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN);
         }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
index 0179d34..640331f 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
@@ -28,7 +28,7 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.asterix.builders.IAsterixListBuilder;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -123,7 +123,8 @@ public class ArrayDistinctDescriptor extends AbstractScalarFunctionDynamicDescri
             super(args, ctx, inputListType);
             this.sourceLoc = sourceLoc;
             hashes = new Int2ObjectOpenHashMap<>();
-            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+            comp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                    .createBinaryComparator();
             binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
                     .createBinaryHashFunction();
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
index 85ba01f..6b03899 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
@@ -30,9 +30,9 @@ import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.UnorderedListBuilder;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -193,7 +193,8 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
             finalResult = new ArrayBackedValueStorage();
             listAccessor = new ListAccessor();
             caster = new CastTypeEvaluator();
-            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+            comp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                    .createBinaryComparator();
             listsArgs = new IPointable[args.length];
             listsEval = new IScalarEvaluator[args.length];
             pointable = new VoidPointable();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java
index 411e846..884136e 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPositionDescriptor.java
@@ -23,8 +23,11 @@ import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.om.functions.IFunctionTypeInferer;
 import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
+import org.apache.asterix.runtime.functions.FunctionTypeInferers;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
@@ -52,12 +55,18 @@ import org.apache.hyracks.data.std.api.IPointable;
  */
 public class ArrayPositionDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
+    private IAType[] argTypes;
 
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
         @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new ArrayPositionDescriptor();
         }
+
+        @Override
+        public IFunctionTypeInferer createFunctionTypeInferer() {
+            return FunctionTypeInferers.SET_ARGUMENTS_TYPE;
+        }
     };
 
     @Override
@@ -66,6 +75,11 @@ public class ArrayPositionDescriptor extends AbstractScalarFunctionDynamicDescri
     }
 
     @Override
+    public void setImmutableStates(Object... states) {
+        argTypes = (IAType[]) states;
+    }
+
+    @Override
     public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
             throws AlgebricksException {
         return new IScalarEvaluatorFactory() {
@@ -82,7 +96,7 @@ public class ArrayPositionDescriptor extends AbstractScalarFunctionDynamicDescri
         private final ISerializerDeserializer intSerde;
 
         public ArrayPositionEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
-            super(args, ctx, sourceLoc);
+            super(args, ctx, sourceLoc, argTypes);
             intSerde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
         }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java
index 571fb4c..e6cd489 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java
@@ -21,9 +21,9 @@ package org.apache.asterix.runtime.evaluators.functions;
 import java.io.IOException;
 
 import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
 import org.apache.asterix.dataflow.data.nontagged.serde.AOrderedListSerializerDeserializer;
 import org.apache.asterix.dataflow.data.nontagged.serde.AUnorderedListSerializerDeserializer;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
@@ -108,7 +108,8 @@ public class ArrayPutDescriptor extends AbstractScalarFunctionDynamicDescriptor
 
         public ArrayPutEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
             super(args, ctx, 0, 1, args.length - 1, argTypes, true, sourceLoc, true, false);
-            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+            comp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                    .createBinaryComparator();
             storage = new ArrayBackedValueStorage();
             item = new VoidPointable();
             add = new boolean[args.length - 1];
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java
index 79288e1..07d8255 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java
@@ -21,7 +21,7 @@ package org.apache.asterix.runtime.evaluators.functions;
 import java.io.IOException;
 
 import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
@@ -106,7 +106,8 @@ public class ArrayRemoveDescriptor extends AbstractScalarFunctionDynamicDescript
             super(args, ctx, 0, 1, args.length - 1, argTypes, true, sourceLoc, false, false);
             storage = new ArrayBackedValueStorage();
             item = new VoidPointable();
-            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+            comp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                    .createBinaryComparator();
         }
 
         @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
index f591b54..ced6fa9 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
@@ -27,7 +27,7 @@ import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.UnorderedListBuilder;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
@@ -154,7 +154,8 @@ public class ArrayReplaceDescriptor extends AbstractScalarFunctionDynamicDescrip
             caster = new CastTypeEvaluator();
             orderedListBuilder = null;
             unorderedListBuilder = null;
-            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+            comp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                    .createBinaryComparator();
         }
 
         @Override
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
index b2cc7ba..73976f3 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
@@ -27,7 +27,7 @@ import java.util.PriorityQueue;
 import org.apache.asterix.builders.IAsterixListBuilder;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
@@ -104,7 +104,8 @@ public class ArraySortDescriptor extends AbstractScalarFunctionDynamicDescriptor
     }
 
     protected class ArraySortComparator implements Comparator<IPointable> {
-        private final IBinaryComparator comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        private final IBinaryComparator comp = BinaryComparatorFactoryProvider.INSTANCE
+                .getBinaryComparatorFactory(null, null, true).createBinaryComparator();
 
         @Override
         public int compare(IPointable val1, IPointable val2) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
index d649c48..e7e73e6 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.asterix.builders.ArrayListFactory;
 import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
@@ -57,7 +57,8 @@ public class ArraySymDiffEval extends AbstractArrayProcessArraysEval {
         arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
         valueCounterAllocator = new ListObjectPool<>(new ValueCounterFactory());
         hashes = new Int2ObjectOpenHashMap<>();
-        comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        comp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                .createBinaryComparator();
         intHashes = new IntArrayList(50, 10);
         binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
                 .createBinaryHashFunction();
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
index d6a2f1f..9a7a021 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayUnionDescriptor.java
@@ -20,11 +20,9 @@ package org.apache.asterix.runtime.evaluators.functions;
 
 import java.util.List;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.asterix.builders.ArrayListFactory;
 import org.apache.asterix.builders.IAsterixListBuilder;
-import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
+import org.apache.asterix.formats.nontagged.BinaryComparatorFactoryProvider;
 import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
@@ -47,6 +45,9 @@ import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
 /**
  * <pre>
  * array_union(list1, list2, ...) returns a new list with the set union of the input lists (no duplicates).
@@ -114,7 +115,8 @@ public class ArrayUnionDescriptor extends AbstractScalarFunctionDynamicDescripto
             super(args, ctx, true, sourceLoc, argTypes);
             pointableListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
             hashes = new Int2ObjectOpenHashMap<>();
-            comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+            comp = BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(null, null, true)
+                    .createBinaryComparator();
             binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
                     .createBinaryHashFunction();
         }
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
index 6a81005..38ccee5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractHashJoinPOperator.java
@@ -30,8 +30,8 @@ import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator.JoinKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.properties.BroadcastPartitioningProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
 import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
@@ -86,6 +86,12 @@ public abstract class AbstractHashJoinPOperator extends AbstractJoinPOperator {
         this.deliveredProperties = new StructuralPropertiesVector(pp, deliveredLocalProperties(iop, context));
     }
 
+    static void validateNumKeys(List<LogicalVariable> keysLeftBranch, List<LogicalVariable> keysRightBranch) {
+        if (keysLeftBranch.size() != keysRightBranch.size()) {
+            throw new IllegalStateException("Number of keys of left and right branch are not equal in hash join");
+        }
+    }
+
     @Override
     public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
             IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 091cc44..6d8cf30 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -110,17 +110,22 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
+        validateNumKeys(keysLeftBranch, keysRightBranch);
         int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
         int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
         IBinaryHashFunctionFamily[] hashFunFamilies =
                 JobGenHelper.variablesToBinaryHashFunctionFamilies(keysLeftBranch, env, context);
-        IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
-        int i = 0;
+        IBinaryComparatorFactory[] leftCompFactories = new IBinaryComparatorFactory[keysLeft.length];
+        IBinaryComparatorFactory[] rightCompFactories = new IBinaryComparatorFactory[keysRight.length];
         IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
-        for (LogicalVariable v : keysLeftBranch) {
-            Object t = env.getVarType(v);
-            comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+        Object leftType;
+        Object rightType;
+        for (int i = 0; i < keysLeftBranch.size(); i++) {
+            leftType = env.getVarType(keysLeftBranch.get(i));
+            rightType = env.getVarType(keysRightBranch.get(i));
+            leftCompFactories[i] = bcfp.getBinaryComparatorFactory(leftType, rightType, true);
+            rightCompFactories[i] = bcfp.getBinaryComparatorFactory(rightType, leftType, true);
         }
 
         IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
@@ -134,7 +139,7 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
         IOperatorDescriptor opDesc;
 
         opDesc = generateOptimizedHashJoinRuntime(context, inputSchemas, keysLeft, keysRight, hashFunFamilies,
-                comparatorFactories, predEvaluatorFactory, recDescriptor, spec);
+                leftCompFactories, rightCompFactories, predEvaluatorFactory, recDescriptor, spec);
         opDesc.setSourceLocation(op.getSourceLocation());
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
 
@@ -146,15 +151,16 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
 
     private IOperatorDescriptor generateOptimizedHashJoinRuntime(JobGenContext context, IOperatorSchema[] inputSchemas,
             int[] keysLeft, int[] keysRight, IBinaryHashFunctionFamily[] hashFunFamilies,
-            IBinaryComparatorFactory[] comparatorFactories, IPredicateEvaluatorFactory predEvaluatorFactory,
-            RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec) throws AlgebricksException {
+            IBinaryComparatorFactory[] leftCompFactories, IBinaryComparatorFactory[] rightCompFactories,
+            IPredicateEvaluatorFactory predEvaluatorFactory, RecordDescriptor recDescriptor,
+            IOperatorDescriptorRegistry spec) {
         switch (kind) {
             case INNER:
                 return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                         maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                        comparatorFactories, recDescriptor,
-                        new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
-                        new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft), predEvaluatorFactory);
+                        leftCompFactories, rightCompFactories, recDescriptor,
+                        new JoinMultiComparatorFactory(leftCompFactories, keysLeft, keysRight),
+                        new JoinMultiComparatorFactory(rightCompFactories, keysRight, keysLeft), predEvaluatorFactory);
             case LEFT_OUTER:
                 IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[inputSchemas[1].getSize()];
                 for (int j = 0; j < nonMatchWriterFactories.length; j++) {
@@ -162,9 +168,9 @@ public class HybridHashJoinPOperator extends AbstractHashJoinPOperator {
                 }
                 return new OptimizedHybridHashJoinOperatorDescriptor(spec, getMemSizeInFrames(),
                         maxInputBuildSizeInFrames, getFudgeFactor(), keysLeft, keysRight, hashFunFamilies,
-                        comparatorFactories, recDescriptor,
-                        new JoinMultiComparatorFactory(comparatorFactories, keysLeft, keysRight),
-                        new JoinMultiComparatorFactory(comparatorFactories, keysRight, keysLeft), predEvaluatorFactory,
+                        leftCompFactories, rightCompFactories, recDescriptor,
+                        new JoinMultiComparatorFactory(leftCompFactories, keysLeft, keysRight),
+                        new JoinMultiComparatorFactory(rightCompFactories, keysRight, keysLeft), predEvaluatorFactory,
                         true, nonMatchWriterFactories);
             default:
                 throw new NotImplementedException();
@@ -216,8 +222,7 @@ class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
     private final int[] keysLeft;
     private final int[] keysRight;
 
-    public JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft,
-            int[] keysRight) {
+    JoinMultiComparatorFactory(IBinaryComparatorFactory[] binaryComparatorFactory, int[] keysLeft, int[] keysRight) {
         this.binaryComparatorFactories = binaryComparatorFactory;
         this.keysLeft = keysLeft;
         this.keysRight = keysRight;
@@ -234,7 +239,7 @@ class JoinMultiComparatorFactory implements ITuplePairComparatorFactory {
 }
 
 /**
- * {@ ITuplePairComparator} implementation for optimized hybrid hash join.
+ * {@code ITuplePairComparator} implementation for optimized hybrid hash join.
  * The comparator applies multiple binary comparators, one for each key pairs
  */
 class JoinMultiComparator implements ITuplePairComparator {
@@ -242,7 +247,7 @@ class JoinMultiComparator implements ITuplePairComparator {
     private final int[] keysLeft;
     private final int[] keysRight;
 
-    public JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
+    JoinMultiComparator(IBinaryComparator[] bComparator, int[] keysLeft, int[] keysRight) {
         this.binaryComparators = bComparator;
         this.keysLeft = keysLeft;
         this.keysRight = keysRight;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 580d8e1..58e10ba 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -84,17 +84,20 @@ public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
+        validateNumKeys(keysLeftBranch, keysRightBranch);
         int[] keysLeft = JobGenHelper.variablesToFieldIndexes(keysLeftBranch, inputSchemas[0]);
         int[] keysRight = JobGenHelper.variablesToFieldIndexes(keysRightBranch, inputSchemas[1]);
         IVariableTypeEnvironment env = context.getTypeEnvironment(op);
         IBinaryHashFunctionFactory[] hashFunFactories =
                 JobGenHelper.variablesToBinaryHashFunctionFactories(keysLeftBranch, env, context);
         IBinaryComparatorFactory[] comparatorFactories = new IBinaryComparatorFactory[keysLeft.length];
-        int i = 0;
         IBinaryComparatorFactoryProvider bcfp = context.getBinaryComparatorFactoryProvider();
-        for (LogicalVariable v : keysLeftBranch) {
-            Object t = env.getVarType(v);
-            comparatorFactories[i++] = bcfp.getBinaryComparatorFactory(t, true);
+        Object leftType;
+        Object rightType;
+        for (int i = 0; i < keysLeftBranch.size(); i++) {
+            leftType = env.getVarType(keysLeftBranch.get(i));
+            rightType = env.getVarType(keysRightBranch.get(i));
+            comparatorFactories[i] = bcfp.getBinaryComparatorFactory(leftType, rightType, true);
         }
 
         IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
@@ -142,9 +145,9 @@ public class InMemoryHashJoinPOperator extends AbstractHashJoinPOperator {
         List<ILocalStructuralProperty> lp0 = pv0.getLocalProperties();
         if (lp0 != null) {
             // maintains the local properties on the probe side
-            return new LinkedList<ILocalStructuralProperty>(lp0);
+            return new LinkedList<>(lp0);
         }
-        return new LinkedList<ILocalStructuralProperty>();
+        return new LinkedList<>();
     }
 
 }
diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
index 9584c69..c8c4a24 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
@@ -22,33 +22,49 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
 /**
- * Provides {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} for different types
+ * Provides {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} for different types. Whether a
+ * factory is stateful or stateless is implementation-specific. Also, whether a new factory is created each time or a
+ * single instance factory is returned is implementation-specific. Therefore, no assumptions should be made about
+ * these two aspects.
  */
 public interface IBinaryComparatorFactoryProvider {
 
     /**
-     * @param type
-     *            the type of the binary data
-     * @param ascending
-     *            the order direction. true if ascending order is desired, false otherwise
+     * @param type the type of the left binary data
+     * @param ascending the order direction. true if ascending order is desired, false otherwise
      * @return the appropriate {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} instance
-     * @throws AlgebricksException
-     *             if the comparator factory for the passed type could not be created
+     * @throws AlgebricksException if the comparator factory for the passed type could not be created
      */
     IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) throws AlgebricksException;
 
     /**
-     *
-     * @param type
-     *            the type of the binary data
-     * @param ascending
-     *            the order direction. true if ascending order is desired, false otherwise
-     * @param ignoreCase
-     *            ignore case for strings
+     * @param type the type of the left binary data
+     * @param ascending the order direction. true if ascending order is desired, false otherwise
+     * @param ignoreCase ignore case for strings
      * @return the appropriate {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} instance
-     * @throws AlgebricksException
-     *             if the comparator factory for the passed type could not be created
+     * @throws AlgebricksException if the comparator factory for the passed type could not be created
      */
     IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending, boolean ignoreCase)
             throws AlgebricksException;
+
+    /**
+     * @param leftType the type of the left binary data
+     * @param rightType the type of the right binary data
+     * @param ascending the order direction. true if ascending order is desired, false otherwise
+     * @return the appropriate {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} instance
+     * @throws AlgebricksException if the comparator factory for the passed type could not be created
+     */
+    IBinaryComparatorFactory getBinaryComparatorFactory(Object leftType, Object rightType, boolean ascending)
+            throws AlgebricksException;
+
+    /**
+     * @param leftType the type of the left binary data
+     * @param rightType the type of the right binary data
+     * @param ascending the order direction. true if ascending order is desired, false otherwise
+     * @param ignoreCase ignore case for strings
+     * @return the appropriate {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} instance
+     * @throws AlgebricksException if the comparator factory for the passed type could not be created
+     */
+    IBinaryComparatorFactory getBinaryComparatorFactory(Object leftType, Object rightType, boolean ascending,
+            boolean ignoreCase) throws AlgebricksException;
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index e7984d6..2452ae5 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -124,7 +124,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
     private final int[] probeKeys;
     private final int[] buildKeys;
     private final IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories;
-    private final IBinaryComparatorFactory[] comparatorFactories; //For in-mem HJ
+    private final IBinaryComparatorFactory[] probCompFactories; //For in-mem HJ
+    private final IBinaryComparatorFactory[] buildCompFactories; //For in-mem HJ
     private final ITuplePairComparatorFactory tuplePairComparatorFactoryProbe2Build; //For NLJ in probe
     private final ITuplePairComparatorFactory tuplePairComparatorFactoryBuild2Probe; //For NLJ in probe
     private final IPredicateEvaluatorFactory predEvaluatorFactory;
@@ -141,8 +142,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSizeInFrames,
             int inputsize0, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory01,
+            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] probCompFactories,
+            IBinaryComparatorFactory[] buildCompFactories, RecordDescriptor recordDescriptor,
+            ITuplePairComparatorFactory tupPaircomparatorFactory01,
             ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory,
             boolean isLeftOuter, IMissingWriterFactory[] nonMatchWriterFactories) {
         super(spec, 2, 1);
@@ -152,7 +154,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
         this.probeKeys = keys0;
         this.buildKeys = keys1;
         this.hashFunctionGeneratorFactories = hashFunctionGeneratorFactories;
-        this.comparatorFactories = comparatorFactories;
+        this.probCompFactories = probCompFactories;
+        this.buildCompFactories = buildCompFactories;
         this.tuplePairComparatorFactoryProbe2Build = tupPaircomparatorFactory01;
         this.tuplePairComparatorFactoryBuild2Probe = tupPaircomparatorFactory10;
         outRecDescs[0] = recordDescriptor;
@@ -163,11 +166,12 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
     public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSizeInFrames,
             int inputsize0, double factor, int[] keys0, int[] keys1,
-            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] comparatorFactories,
-            RecordDescriptor recordDescriptor, ITuplePairComparatorFactory tupPaircomparatorFactory01,
+            IBinaryHashFunctionFamily[] hashFunctionGeneratorFactories, IBinaryComparatorFactory[] probCompFactories,
+            IBinaryComparatorFactory[] buildCompFactories, RecordDescriptor recordDescriptor,
+            ITuplePairComparatorFactory tupPaircomparatorFactory01,
             ITuplePairComparatorFactory tupPaircomparatorFactory10, IPredicateEvaluatorFactory predEvaluatorFactory) {
-        this(spec, memSizeInFrames, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories,
-                comparatorFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10,
+        this(spec, memSizeInFrames, inputsize0, factor, keys0, keys1, hashFunctionGeneratorFactories, probCompFactories,
+                buildCompFactories, recordDescriptor, tupPaircomparatorFactory01, tupPaircomparatorFactory10,
                 predEvaluatorFactory, false, null);
     }
 
@@ -262,9 +266,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
 
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
-            for (int i = 0; i < comparatorFactories.length; i++) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            final IBinaryComparator[] probComparators = new IBinaryComparator[probCompFactories.length];
+            for (int i = 0; i < probCompFactories.length; i++) {
+                probComparators[i] = probCompFactories[i].createBinaryComparator();
             }
 
             final IPredicateEvaluator predEvaluator =
@@ -291,7 +295,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     state.numOfPartitions =
                             getNumberOfPartitions(state.memForJoin, inputsize0, fudgeFactor, nPartitions);
                     state.hybridHJ = new OptimizedHybridHashJoin(ctx, state.memForJoin, state.numOfPartitions,
-                            PROBE_REL, BUILD_REL, probeKeys, buildKeys, comparators, probeRd, buildRd, probeHpc,
+                            PROBE_REL, BUILD_REL, probeKeys, buildKeys, probComparators, probeRd, buildRd, probeHpc,
                             buildHpc, predEvaluator, isLeftOuter, nonMatchWriterFactories);
 
                     state.hybridHJ.initBuild();
@@ -355,7 +359,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
             final RecordDescriptor buildRd = recordDescProvider.getInputRecordDescriptor(buildAid, 0);
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
-            final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
+            final IBinaryComparator[] probComp = new IBinaryComparator[probCompFactories.length];
+            final IBinaryComparator[] buildComp = new IBinaryComparator[buildCompFactories.length];
             final ITuplePairComparator nljComparatorProbe2Build =
                     tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
             final ITuplePairComparator nljComparatorBuild2Probe =
@@ -363,10 +368,12 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
             final IPredicateEvaluator predEvaluator =
                     predEvaluatorFactory == null ? null : predEvaluatorFactory.createPredicateEvaluator();
 
-            for (int i = 0; i < comparatorFactories.length; i++) {
-                comparators[i] = comparatorFactories[i].createBinaryComparator();
+            for (int i = 0; i < probCompFactories.length; i++) {
+                probComp[i] = probCompFactories[i].createBinaryComparator();
+            }
+            for (int i = 0; i < buildCompFactories.length; i++) {
+                buildComp[i] = buildCompFactories[i].createBinaryComparator();
             }
-
             final IMissingWriter[] nonMatchWriter =
                     isLeftOuter ? new IMissingWriter[nonMatchWriterFactories.length] : null;
             final ArrayTupleBuilder nullTupleBuild =
@@ -519,7 +526,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             }
                             //Build Side is smaller
                             applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc, probeHpc,
-                                    buildSideReader, probeSideReader); // checked-confirmed
+                                    buildSideReader, probeSideReader, probComp); // checked-confirmed
                         } else { //Case 1.2 - InMemHJ with Role Reversal
                             if (LOGGER.isDebugEnabled()) {
                                 LOGGER.debug("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ"
@@ -532,7 +539,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             }
                             //Probe Side is smaller
                             applyInMemHashJoin(probeKeys, buildKeys, tabSize, probeRd, buildRd, probeHpc, buildHpc,
-                                    probeSideReader, buildSideReader); // checked-confirmed
+                                    probeSideReader, buildSideReader, buildComp); // checked-confirmed
                         }
                     }
                     //Apply (Recursive) HHJ
@@ -549,7 +556,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                             }
                             applyHybridHashJoin((int) buildPartSize, PROBE_REL, BUILD_REL, probeKeys, buildKeys,
                                     probeRd, buildRd, probeHpc, buildHpc, probeSideReader, buildSideReader, level,
-                                    beforeMax);
+                                    beforeMax, probComp);
 
                         } else { //Case 2.2 - Recursive HHJ (with Role-Reversal)
                             if (LOGGER.isDebugEnabled()) {
@@ -559,7 +566,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                             applyHybridHashJoin((int) probePartSize, BUILD_REL, PROBE_REL, buildKeys, probeKeys,
                                     buildRd, probeRd, buildHpc, probeHpc, buildSideReader, probeSideReader, level,
-                                    beforeMax);
+                                    beforeMax, buildComp);
 
                         }
                     }
@@ -569,7 +576,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                         final int[] probeKeys, final int[] buildKeys, final RecordDescriptor probeRd,
                         final RecordDescriptor buildRd, final ITuplePartitionComputer probeHpc,
                         final ITuplePartitionComputer buildHpc, RunFileReader probeSideReader,
-                        RunFileReader buildSideReader, final int level, final long beforeMax)
+                        RunFileReader buildSideReader, final int level, final long beforeMax, IBinaryComparator[] comp)
                         throws HyracksDataException {
 
                     boolean isReversed = probeKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
@@ -578,7 +585,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     OptimizedHybridHashJoin rHHj;
                     int n = getNumberOfPartitions(state.memForJoin, tableSize, fudgeFactor, nPartitions);
                     rHHj = new OptimizedHybridHashJoin(ctx, state.memForJoin, n, PROBE_REL, BUILD_REL, probeKeys,
-                            buildKeys, comparators, probeRd, buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter,
+                            buildKeys, comp, probeRd, buildRd, probeHpc, buildHpc, predEvaluator, isLeftOuter,
                             nonMatchWriterFactories); //checked-confirmed
 
                     rHHj.setIsReversed(isReversed);
@@ -710,8 +717,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
 
                 private void applyInMemHashJoin(int[] bKeys, int[] pKeys, int tabSize, RecordDescriptor buildRDesc,
                         RecordDescriptor probeRDesc, ITuplePartitionComputer hpcRepBuild,
-                        ITuplePartitionComputer hpcRepProbe, RunFileReader bReader, RunFileReader pReader)
-                        throws HyracksDataException {
+                        ITuplePartitionComputer hpcRepProbe, RunFileReader bReader, RunFileReader pReader,
+                        IBinaryComparator[] comp) throws HyracksDataException {
                     boolean isReversed = pKeys == OptimizedHybridHashJoinOperatorDescriptor.this.buildKeys
                             && bKeys == OptimizedHybridHashJoinOperatorDescriptor.this.probeKeys;
                     assert isLeftOuter ? !isReversed : true : "LeftOut Join can not reverse roles";
@@ -722,7 +729,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD
                     ISerializableTable table = new SerializableHashTable(tabSize, ctx, bufferManager);
                     InMemoryHashJoin joiner = new InMemoryHashJoin(ctx, new FrameTupleAccessor(probeRDesc), hpcRepProbe,
                             new FrameTupleAccessor(buildRDesc), buildRDesc, hpcRepBuild,
-                            new FrameTuplePairComparator(pKeys, bKeys, comparators), isLeftOuter, nonMatchWriter, table,
+                            new FrameTuplePairComparator(pKeys, bKeys, comp), isLeftOuter, nonMatchWriter, table,
                             predEvaluator, isReversed, bufferManager);
 
                     try {
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index a87dc1e..94ea8c5 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -180,6 +180,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 1.2, new int[] { 1 }, new int[] { 0 },
                 new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc,
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null,
@@ -288,6 +289,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 1.2, new int[] { 0 }, new int[] { 1 },
                 new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc,
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null,
@@ -401,6 +403,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 new int[] { 1 }, new int[] { 0 },
                 new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc,
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null,
@@ -581,6 +584,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 1.2, new int[] { 0 }, new int[] { 1 },
                 new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc,
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
@@ -631,6 +635,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 1.2, new int[] { 0 }, new int[] { 1 },
                 new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc,
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
@@ -682,6 +687,7 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest {
                 1.2, new int[] { 0 }, new int[] { 1 },
                 new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                 custOrderJoinDesc,
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
                 new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index a0d40ee..7744974 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -190,6 +190,7 @@ public class Join {
                     new int[] { 0 }, new int[] { 1 },
                     new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE },
                     new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                    new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
                     Common.custOrderJoinDesc,
                     new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1),
                     new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0),