You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2019/02/03 10:00:30 UTC

[asterixdb] branch master updated (4e1773f -> 0615d61)

This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git.


    from 4e1773f  [NO ISSUE] Interval join tests written in SQL++
     new 83f2efa  [NO ISSUE][NET] Catch All Network Unexpected Exceptions
     new 04aebfe  [FUN][RT] Objects creation in array functions
     new 0615d61  Merge commit '04aebfe' from 'stabilization-f69489' into 'master'

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../array_intersect/array_intersect.3.query.sqlpp  |   5 +-
 .../array_intersect/array_intersect.3.adm          |   2 +-
 .../array_fun/array_symdiff/array_symdiff.3.adm    |   2 +-
 .../array_fun/array_symdiffn/array_symdiffn.3.adm  |   2 +-
 .../functions/AbstractArrayAddRemoveEval.java      | 137 +++++-----
 .../functions/AbstractArrayProcessArraysEval.java  |  71 +++---
 .../functions/AbstractArrayProcessEval.java        |  12 +-
 .../functions/AbstractArraySearchEval.java         |   9 +-
 .../functions/ArrayDistinctDescriptor.java         |   4 +-
 .../functions/ArrayFlattenDescriptor.java          |  69 +++---
 .../functions/ArrayIntersectDescriptor.java        | 275 ++++++++++++---------
 .../evaluators/functions/ArrayPutDescriptor.java   |  27 +-
 .../functions/ArrayRemoveDescriptor.java           |  10 +-
 .../functions/ArrayReplaceDescriptor.java          |  67 ++---
 .../evaluators/functions/ArraySortDescriptor.java  |   4 +-
 .../evaluators/functions/ArrayStarDescriptor.java  | 139 ++++++++---
 .../evaluators/functions/ArraySymDiffEval.java     |  21 +-
 .../evaluators/functions/CastTypeEvaluator.java    |  14 +-
 .../hyracks/ipc/impl/IPCConnectionManager.java     |  10 +-
 19 files changed, 529 insertions(+), 351 deletions(-)


[asterixdb] 02/03: [FUN][RT] Objects creation in array functions

Posted by mh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 04aebfe738775b55d94a956f530840f748e7a4c1
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Sat Jan 26 14:07:32 2019 -0800

    [FUN][RT] Objects creation in array functions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - deallocate caster pointables for array functions using casting
    - avoid object creations in:
    ArrayIntersectDescriptor, AbstractArrayProcessEval, ArrayPutDescriptor, ArrayStarDescriptor
    - avoid iterator creations in:
    ArrayIntersectDescriptor, ArrayStarDescriptor, ArraySymDiffEval
    - avoid evaluating the lists arguments twice when casting them
    - use getOrWriteItem() instead of writeItem() when accessing a serialized list
    - fix array_intersect to pick the smallest list as a starting list
    
    Change-Id: Ib6c8c55ed3e0a35e00c5976a46e9ed6e432a6e9f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3129
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../array_intersect/array_intersect.3.query.sqlpp  |   5 +-
 .../array_intersect/array_intersect.3.adm          |   2 +-
 .../array_fun/array_symdiff/array_symdiff.3.adm    |   2 +-
 .../array_fun/array_symdiffn/array_symdiffn.3.adm  |   2 +-
 .../functions/AbstractArrayAddRemoveEval.java      | 137 +++++-----
 .../functions/AbstractArrayProcessArraysEval.java  |  71 +++---
 .../functions/AbstractArrayProcessEval.java        |  12 +-
 .../functions/AbstractArraySearchEval.java         |   9 +-
 .../functions/ArrayDistinctDescriptor.java         |   4 +-
 .../functions/ArrayFlattenDescriptor.java          |  69 +++---
 .../functions/ArrayIntersectDescriptor.java        | 275 ++++++++++++---------
 .../evaluators/functions/ArrayPutDescriptor.java   |  27 +-
 .../functions/ArrayRemoveDescriptor.java           |  10 +-
 .../functions/ArrayReplaceDescriptor.java          |  67 ++---
 .../evaluators/functions/ArraySortDescriptor.java  |   4 +-
 .../evaluators/functions/ArrayStarDescriptor.java  | 139 ++++++++---
 .../evaluators/functions/ArraySymDiffEval.java     |  21 +-
 .../evaluators/functions/CastTypeEvaluator.java    |  14 +-
 18 files changed, 524 insertions(+), 346 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
index 3de0126..b5bbbb3 100755
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/array_fun/array_intersect/array_intersect.3.query.sqlpp
@@ -20,7 +20,7 @@
 use TinySocial;
 
 {
-  "t1": (select array_intersect(t.`referred-topics`, {{"t-mobile", "platform"}}, {{"t-mobile"}}) from TweetMessages t order by t.tweetid),
+  "t1": (select t.tweetid, array_intersect(t.`referred-topics`, {{"t-mobile", "platform"}}, {{"t-mobile"}}) from TweetMessages t order by t.tweetid),
   "t2": (select array_intersect([1, "John", 2], (select value v.id from d1 v), [2,4,1])),
   "t3": (array_intersect([3,5,1], [5,7,3], [3,2,5,1])),
   "t4": (array_intersect([3,5.0,1], [5,7,3], [3,2,5,1])),
@@ -36,5 +36,6 @@ use TinySocial;
   "t14": (array_intersect(missing, "non_array", [2,5,1])),
   "t15": (array_intersect([], [], [])),
   "t16": (array_intersect([], [3,2], [])),
-  "t17": (select array_intersect(d.followers, ["John Green", "sth"], ["sth", "John Green"]) from d1 d)
+  "t17": (select array_intersect(d.followers, ["John Green", "sth"], ["sth", "John Green"]) from d1 d),
+  "t18": (array_intersect([1,2], [3,2,1], [1,2,3,4]))
 };
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
index 2276eb4..368becd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_intersect/array_intersect.3.adm
@@ -1 +1 @@
-{ "t1": [ { "$1": {{ "t-mobile" }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{  }} }, { "$1": {{ "t-mobile" }} }, { "$1": {{  }} } ], "t2": [ { "$2": [ 2, 1 ] } ], "t3": [ 3, 5 ], "t4": [ 3, 5 ], "t5": [ 3, "a" ], "t6": [ 3 ], "t7": [  ], "t8": [  ], "t9": [  ], "t10": [  ], "t12": null, "t13": null, "t15": [  ], "t16": [  ], "t17": [ {  }, { "$3": [ "John Green" ] } ] }
+{ "t1": [ { "tweetid": "1", "$1": {{ "t-mobile" }} }, { "tweetid": "10", "$1": {{  }} }, { "tweetid": "11", "$1": {{  }} }, { "tweetid": "12", "$1": {{  }} }, { "tweetid": "2", "$1": {{  }} }, { "tweetid": "3", "$1": {{  }} }, { "tweetid": "4", "$1": {{  }} }, { "tweetid": "5", "$1": {{  }} }, { "tweetid": "6", "$1": {{  }} }, { "tweetid": "7", "$1": {{  }} }, { "tweetid": "8", "$1": {{ "t-mobile" }} }, { "tweetid": "9", "$1": {{  }} } ], "t2": [ { "$2": [ 2, 1 ] } ], "t3": [ 3, 5 ], "t4 [...]
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiff/array_symdiff.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiff/array_symdiff.3.adm
index de24df1..cc8cb81 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiff/array_symdiff.3.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiff/array_symdiff.3.adm
@@ -1 +1 @@
-{ "t1": [ { "$1": {{ "platform", "customization", "coffee-mobile" }} }, { "$1": {{ "platform", "verizon", "voice-clarity", "t-mobile", "coffee-mobile" }} }, { "$1": {{ "t-mobile", "coffee-mobile", "iphone" }} }, { "$1": {{ "platform", "samsung", "t-mobile", "voice-command", "coffee-mobile" }} }, { "$1": {{ "platform", "verizon", "shortcut-menu", "t-mobile", "coffee-mobile" }} }, { "$1": {{ "platform", "motorola", "t-mobile", "speed", "coffee-mobile" }} }, { "$1": {{ "platform", "t-mobile [...]
+{ "t1": [ { "$1": {{ "customization", "coffee-mobile", "platform" }} }, { "$1": {{ "verizon", "voice-clarity", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "iphone", "coffee-mobile", "t-mobile" }} }, { "$1": {{ "samsung", "voice-command", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "verizon", "shortcut-menu", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "motorola", "speed", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "sprint", "voice-comm [...]
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiffn/array_symdiffn.3.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiffn/array_symdiffn.3.adm
index c92cfd0..b3528b4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiffn/array_symdiffn.3.adm
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/array_fun/array_symdiffn/array_symdiffn.3.adm
@@ -1 +1 @@
-{ "t1": [ { "$1": {{ "platform", "customization", "coffee-mobile" }} }, { "$1": {{ "platform", "verizon", "voice-clarity", "t-mobile", "coffee-mobile" }} }, { "$1": {{ "t-mobile", "coffee-mobile", "iphone" }} }, { "$1": {{ "platform", "samsung", "t-mobile", "voice-command", "coffee-mobile" }} }, { "$1": {{ "platform", "verizon", "shortcut-menu", "t-mobile", "coffee-mobile" }} }, { "$1": {{ "platform", "motorola", "t-mobile", "speed", "coffee-mobile" }} }, { "$1": {{ "platform", "t-mobile [...]
+{ "t1": [ { "$1": {{ "customization", "coffee-mobile", "platform" }} }, { "$1": {{ "verizon", "voice-clarity", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "iphone", "coffee-mobile", "t-mobile" }} }, { "$1": {{ "samsung", "voice-command", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "verizon", "shortcut-menu", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "motorola", "speed", "coffee-mobile", "t-mobile", "platform" }} }, { "$1": {{ "sprint", "voice-comm [...]
\ No newline at end of file
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
index 1100a59..1ed467c 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayAddRemoveEval.java
@@ -49,6 +49,8 @@ public abstract class AbstractArrayAddRemoveEval implements IScalarEvaluator {
     private final IAType[] argTypes;
     private final ArrayBackedValueStorage storage;
     private final IPointable listArg;
+    private final IPointable tempList;
+    private final IPointable tempItem;
     private final IPointable[] valuesArgs;
     private final IScalarEvaluator listArgEval;
     private final IScalarEvaluator[] valuesEval;
@@ -79,6 +81,8 @@ public abstract class AbstractArrayAddRemoveEval implements IScalarEvaluator {
         caster = new CastTypeEvaluator();
         storage = new ArrayBackedValueStorage();
         listArg = new VoidPointable();
+        tempList = new VoidPointable();
+        tempItem = new VoidPointable();
         listArgEval = args[listOffset].createScalarEvaluator(ctx);
         valuesArgs = new IPointable[numValues];
         valuesEval = new IScalarEvaluator[numValues];
@@ -100,11 +104,11 @@ public abstract class AbstractArrayAddRemoveEval implements IScalarEvaluator {
     @Override
     public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
         // get the list argument, 1st or last argument, make sure it's a list
-        listArgEval.evaluate(tuple, listArg);
-        ATypeTag listArgTag = ATYPETAGDESERIALIZER.deserialize(listArg.getByteArray()[listArg.getStartOffset()]);
+        listArgEval.evaluate(tuple, tempList);
+        ATypeTag listArgTag = ATYPETAGDESERIALIZER.deserialize(tempList.getByteArray()[tempList.getStartOffset()]);
 
         // evaluate the position argument if provided by some functions
-        int adjustedPosition = getPosition(tuple, listArg, listArgTag);
+        int adjustedPosition = getPosition(tuple, tempList, listArgTag);
 
         if (listArgTag == ATypeTag.MISSING || adjustedPosition == RETURN_MISSING) {
             PointableHelper.setMissing(result);
@@ -120,76 +124,81 @@ public abstract class AbstractArrayAddRemoveEval implements IScalarEvaluator {
         ATypeTag valueTag;
         IAType defaultOpenType;
         boolean encounteredNonPrimitive = false;
-        for (int i = 0; i < valuesEval.length; i++) {
-            // cast val to open if needed. don't cast if function will return null anyway, e.g. list arg was not list
-            defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(argTypes[i + valuesOffset].getTypeTag());
-            if (defaultOpenType != null && !returnNull) {
-                caster.reset(defaultOpenType, argTypes[i + valuesOffset], valuesEval[i]);
-                caster.evaluate(tuple, valuesArgs[i]);
-            } else {
-                valuesEval[i].evaluate(tuple, valuesArgs[i]);
-            }
-            valueTag = ATYPETAGDESERIALIZER.deserialize(valuesArgs[i].getByteArray()[valuesArgs[i].getStartOffset()]);
-            // for now, we don't support deep equality of object/lists. Throw an error if the value is of these types
-            if (comparesValues && valueTag.isDerivedType()) {
-                encounteredNonPrimitive = true;
+        try {
+            for (int i = 0; i < valuesEval.length; i++) {
+                // cast val to open if needed. don't cast if function will return null anyway, e.g. list arg not list
+                defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(argTypes[i + valuesOffset].getTypeTag());
+                if (defaultOpenType != null && !returnNull) {
+                    caster.resetAndAllocate(defaultOpenType, argTypes[i + valuesOffset], valuesEval[i]);
+                    caster.evaluate(tuple, valuesArgs[i]);
+                } else {
+                    valuesEval[i].evaluate(tuple, valuesArgs[i]);
+                }
+                valueTag =
+                        ATYPETAGDESERIALIZER.deserialize(valuesArgs[i].getByteArray()[valuesArgs[i].getStartOffset()]);
+                // for now, we don't support deep equality of object/lists. Throw an error if value is of these types
+                if (comparesValues && valueTag.isDerivedType()) {
+                    encounteredNonPrimitive = true;
+                }
+                if (valueTag == ATypeTag.MISSING) {
+                    PointableHelper.setMissing(result);
+                    return;
+                }
+                if (!acceptNullValues && valueTag == ATypeTag.NULL) {
+                    returnNull = true;
+                }
             }
-            if (valueTag == ATypeTag.MISSING) {
-                PointableHelper.setMissing(result);
+
+            if (returnNull) {
+                PointableHelper.setNull(result);
                 return;
             }
-            if (!acceptNullValues && valueTag == ATypeTag.NULL) {
-                returnNull = true;
-            }
-        }
-
-        if (returnNull) {
-            PointableHelper.setNull(result);
-            return;
-        }
 
-        if (encounteredNonPrimitive) {
-            throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLocation);
-        }
-        // all arguments are valid
-        AbstractCollectionType listType;
-        IAsterixListBuilder listBuilder;
-        // create the new list to be returned. cast the input list and make it open if required
-        if (listArgTag == ATypeTag.ARRAY) {
-            if (orderedListBuilder == null) {
-                orderedListBuilder = new OrderedListBuilder();
-            }
-            listBuilder = orderedListBuilder;
-            if (makeOpen || argTypes[listOffset].getTypeTag() != ATypeTag.ARRAY) {
-                listType = DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE;
-                caster.reset(listType, argTypes[listOffset], listArgEval);
-                caster.evaluate(tuple, listArg);
-            } else {
-                listType = (AbstractCollectionType) argTypes[listOffset];
-            }
-        } else {
-            if (unorderedListBuilder == null) {
-                unorderedListBuilder = new UnorderedListBuilder();
+            if (encounteredNonPrimitive) {
+                throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLocation);
             }
-            listBuilder = unorderedListBuilder;
-            if (makeOpen || argTypes[listOffset].getTypeTag() != ATypeTag.MULTISET) {
-                listType = DefaultOpenFieldType.NESTED_OPEN_AUNORDERED_LIST_TYPE;
-                caster.reset(listType, argTypes[listOffset], listArgEval);
-                caster.evaluate(tuple, listArg);
+            // all arguments are valid
+            AbstractCollectionType listType;
+            IAsterixListBuilder listBuilder;
+            // create the new list to be returned. cast the input list and make it open if required
+            if (listArgTag == ATypeTag.ARRAY) {
+                if (orderedListBuilder == null) {
+                    orderedListBuilder = new OrderedListBuilder();
+                }
+                listBuilder = orderedListBuilder;
+                if (makeOpen || argTypes[listOffset].getTypeTag() != ATypeTag.ARRAY) {
+                    listType = DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE;
+                    caster.resetAndAllocate(listType, argTypes[listOffset], listArgEval);
+                    caster.cast(tempList, listArg);
+                } else {
+                    listType = (AbstractCollectionType) argTypes[listOffset];
+                    listArg.set(tempList);
+                }
             } else {
-                listType = (AbstractCollectionType) argTypes[listOffset];
+                if (unorderedListBuilder == null) {
+                    unorderedListBuilder = new UnorderedListBuilder();
+                }
+                listBuilder = unorderedListBuilder;
+                if (makeOpen || argTypes[listOffset].getTypeTag() != ATypeTag.MULTISET) {
+                    listType = DefaultOpenFieldType.NESTED_OPEN_AUNORDERED_LIST_TYPE;
+                    caster.resetAndAllocate(listType, argTypes[listOffset], listArgEval);
+                    caster.cast(tempList, listArg);
+                } else {
+                    listType = (AbstractCollectionType) argTypes[listOffset];
+                    listArg.set(tempList);
+                }
             }
-        }
 
-        listBuilder.reset(listType);
-        listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset());
-        try {
+            listBuilder.reset(listType);
+            listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset());
             processList(listAccessor, listBuilder, valuesArgs, adjustedPosition);
             storage.reset();
             listBuilder.write(storage.getDataOutput(), true);
             result.set(storage);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
+        } finally {
+            caster.deallocatePointables();
         }
     }
 
@@ -197,18 +206,16 @@ public abstract class AbstractArrayAddRemoveEval implements IScalarEvaluator {
             int position) throws IOException {
         int i;
         for (i = 0; i < position; i++) {
-            storage.reset();
-            listAccessor.writeItem(i, storage.getDataOutput());
-            listBuilder.addItem(storage);
+            listAccessor.getOrWriteItem(i, tempItem, storage);
+            listBuilder.addItem(tempItem);
         }
         // insert the values arguments
         for (int j = 0; j < values.length; j++) {
             listBuilder.addItem(values[j]);
         }
         for (; i < listAccessor.size(); i++) {
-            storage.reset();
-            listAccessor.writeItem(i, storage.getDataOutput());
-            listBuilder.addItem(storage);
+            listAccessor.getOrWriteItem(i, tempItem, storage);
+            listBuilder.addItem(tempItem);
         }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
index 3258a8d..1cf75bc 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessArraysEval.java
@@ -48,8 +48,9 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractArrayProcessArraysEval implements IScalarEvaluator {
-    private ArrayBackedValueStorage finalResult;
+    private final ArrayBackedValueStorage finalResult;
     private final ListAccessor listAccessor;
+    private final IPointable tempList;
     private final IPointable[] listsArgs;
     private final IScalarEvaluator[] listsEval;
     private final SourceLocation sourceLocation;
@@ -70,6 +71,7 @@ public abstract class AbstractArrayProcessArraysEval implements IScalarEvaluator
         finalResult = new ArrayBackedValueStorage();
         listAccessor = new ListAccessor();
         caster = new CastTypeEvaluator();
+        tempList = new VoidPointable();
         listsArgs = new IPointable[args.length];
         listsEval = new IScalarEvaluator[args.length];
         for (int i = 0; i < args.length; i++) {
@@ -87,46 +89,46 @@ public abstract class AbstractArrayProcessArraysEval implements IScalarEvaluator
         boolean returnNull = false;
         AbstractCollectionType outList = null;
         ATypeTag listTag;
-        for (int i = 0; i < listsEval.length; i++) {
-            listsEval[i].evaluate(tuple, listsArgs[i]);
-            if (!returnNull) {
-                listArgType = listsArgs[i].getByteArray()[listsArgs[i].getStartOffset()];
-                listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
-                if (!listTag.isListType()) {
-                    returnNull = true;
-                } else if (outList != null && outList.getTypeTag() != listTag) {
-                    throw new RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLocation);
-                } else {
-                    if (outList == null) {
-                        outList = (AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
+        try {
+            for (int i = 0; i < listsEval.length; i++) {
+                listsEval[i].evaluate(tuple, tempList);
+                if (!returnNull) {
+                    listArgType = tempList.getByteArray()[tempList.getStartOffset()];
+                    listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
+                    if (!listTag.isListType()) {
+                        returnNull = true;
+                    } else if (outList != null && outList.getTypeTag() != listTag) {
+                        throw new RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLocation);
+                    } else {
+                        if (outList == null) {
+                            outList = (AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
+                        }
+
+                        caster.resetAndAllocate(outList, argTypes[i], listsEval[i]);
+                        caster.cast(tempList, listsArgs[i]);
                     }
-
-                    caster.reset(outList, argTypes[i], listsEval[i]);
-                    caster.evaluate(tuple, listsArgs[i]);
                 }
             }
-        }
-
-        if (returnNull) {
-            PointableHelper.setNull(result);
-            return;
-        }
 
-        IAsterixListBuilder listBuilder;
-        if (outList.getTypeTag() == ATypeTag.ARRAY) {
-            if (orderedListBuilder == null) {
-                orderedListBuilder = new OrderedListBuilder();
+            if (returnNull) {
+                PointableHelper.setNull(result);
+                return;
             }
-            listBuilder = orderedListBuilder;
-        } else {
-            if (unorderedListBuilder == null) {
-                unorderedListBuilder = new UnorderedListBuilder();
+
+            IAsterixListBuilder listBuilder;
+            if (outList.getTypeTag() == ATypeTag.ARRAY) {
+                if (orderedListBuilder == null) {
+                    orderedListBuilder = new OrderedListBuilder();
+                }
+                listBuilder = orderedListBuilder;
+            } else {
+                if (unorderedListBuilder == null) {
+                    unorderedListBuilder = new UnorderedListBuilder();
+                }
+                listBuilder = unorderedListBuilder;
             }
-            listBuilder = unorderedListBuilder;
-        }
 
-        listBuilder.reset(outList);
-        try {
+            listBuilder.reset(outList);
             init();
             processLists(listsArgs, listBuilder);
             finish(listBuilder);
@@ -140,6 +142,7 @@ public abstract class AbstractArrayProcessArraysEval implements IScalarEvaluator
             release();
             storageAllocator.reset();
             pointableAllocator.reset();
+            caster.deallocatePointables();
         }
     }
 
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessEval.java
index 9c4f671..9cdd32c 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArrayProcessEval.java
@@ -31,6 +31,7 @@ import org.apache.asterix.om.types.AOrderedListType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AUnorderedListType;
 import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.asterix.om.util.container.IObjectPool;
@@ -47,6 +48,8 @@ import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractArrayProcessEval implements IScalarEvaluator {
+    private final AOrderedListType orderedListType;
+    private final AUnorderedListType unorderedListType;
     private final ArrayBackedValueStorage storage;
     private final IScalarEvaluator listArgEval;
     private final ListAccessor listAccessor;
@@ -63,6 +66,8 @@ public abstract class AbstractArrayProcessEval implements IScalarEvaluator {
             throws HyracksDataException {
         orderedListBuilder = null;
         unorderedListBuilder = null;
+        orderedListType = new AOrderedListType(BuiltinType.ANY, null);
+        unorderedListType = new AUnorderedListType(BuiltinType.ANY, null);
         storage = new ArrayBackedValueStorage();
         listArg = new VoidPointable();
         pointableAllocator = new PointableAllocator();
@@ -101,9 +106,12 @@ public abstract class AbstractArrayProcessEval implements IScalarEvaluator {
         if (!inputListType.getTypeTag().isListType()) {
             ATypeTag itemType = listAccessor.getItemType();
             if (listAccessor.getListType() == ATypeTag.ARRAY) {
-                outputListType = new AOrderedListType(TypeTagUtil.getBuiltinTypeByTag(itemType), null);
+                // TODO(ali): check the case when the item type from the runtime is a derived type
+                orderedListType.setItemType(TypeTagUtil.getBuiltinTypeByTag(itemType));
+                outputListType = orderedListType;
             } else {
-                outputListType = new AUnorderedListType(TypeTagUtil.getBuiltinTypeByTag(itemType), null);
+                unorderedListType.setItemType(TypeTagUtil.getBuiltinTypeByTag(itemType));
+                outputListType = unorderedListType;
             }
         } else {
             outputListType = (AbstractCollectionType) inputListType;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java
index 6677a1f..8b5ba72 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AbstractArraySearchEval.java
@@ -41,12 +41,13 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 public abstract class AbstractArraySearchEval implements IScalarEvaluator {
     private final IPointable listArg;
     private final IPointable searchedValueArg;
+    private final IPointable tempVal;
     private final IScalarEvaluator listEval;
     private final IScalarEvaluator searchedValueEval;
     private final IBinaryComparator comp;
     private final ListAccessor listAccessor;
     private final SourceLocation sourceLocation;
-    protected final AMutableInt32 intValue;
+    private final AMutableInt32 intValue;
     protected final ArrayBackedValueStorage storage;
 
     public AbstractArraySearchEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLoc)
@@ -54,6 +55,7 @@ public abstract class AbstractArraySearchEval implements IScalarEvaluator {
         storage = new ArrayBackedValueStorage();
         listArg = new VoidPointable();
         searchedValueArg = new VoidPointable();
+        tempVal = new VoidPointable();
         listEval = args[0].createScalarEvaluator(ctx);
         searchedValueEval = args[1].createScalarEvaluator(ctx);
         comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
@@ -92,9 +94,8 @@ public abstract class AbstractArraySearchEval implements IScalarEvaluator {
 
         try {
             for (int i = 0; i < numItems; i++) {
-                storage.reset();
-                listAccessor.writeItem(i, storage.getDataOutput());
-                if (comp.compare(storage.getByteArray(), storage.getStartOffset(), storage.getLength(), valueBytes,
+                listAccessor.getOrWriteItem(i, tempVal, storage);
+                if (comp.compare(tempVal.getByteArray(), tempVal.getStartOffset(), tempVal.getLength(), valueBytes,
                         valueOffset, valueLength) == 0) {
                     intValue.setValue(i);
                     break;
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
index 648132c..0179d34 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayDistinctDescriptor.java
@@ -124,8 +124,6 @@ public class ArrayDistinctDescriptor extends AbstractScalarFunctionDynamicDescri
             this.sourceLoc = sourceLoc;
             hashes = new Int2ObjectOpenHashMap<>();
             comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
-            item = pointableAllocator.allocateEmpty();
-            storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
             binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
                     .createBinaryHashFunction();
         }
@@ -137,6 +135,8 @@ public class ArrayDistinctDescriptor extends AbstractScalarFunctionDynamicDescri
             boolean nullMissingWasAdded = false;
             List<IPointable> sameHashes;
             hashes.clear();
+            item = pointableAllocator.allocateEmpty();
+            storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
             for (int i = 0; i < listAccessor.size(); i++) {
                 // get the item and compute its hash
                 itemInStorage = listAccessor.getOrWriteItem(i, item, storage);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
index 06381b5..fc8fc84 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayFlattenDescriptor.java
@@ -120,11 +120,12 @@ public class ArrayFlattenDescriptor extends AbstractScalarFunctionDynamicDescrip
         private final IScalarEvaluator listEval;
         private final IScalarEvaluator depthEval;
         private final IPointable list;
-        private final AbstractPointable item;
+        private final AbstractPointable pointable;
         private final TaggedValuePointable depthArg;
         private final IObjectPool<IMutableValueStorage, ATypeTag> storageAllocator;
         private final IObjectPool<ListAccessor, ATypeTag> listAccessorAllocator;
         private final CastTypeEvaluator caster;
+        private final ArrayBackedValueStorage finalStorage;
         private ArrayBackedValueStorage storage;
         private IAsterixListBuilder orderedListBuilder;
         private IAsterixListBuilder unorderedListBuilder;
@@ -132,11 +133,11 @@ public class ArrayFlattenDescriptor extends AbstractScalarFunctionDynamicDescrip
         public ArrayFlattenEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
             storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory());
             listAccessorAllocator = new ListObjectPool<>(new ListAccessorFactory());
-            storage = new ArrayBackedValueStorage();
+            finalStorage = new ArrayBackedValueStorage();
             listEval = args[0].createScalarEvaluator(ctx);
             depthEval = args[1].createScalarEvaluator(ctx);
             list = new VoidPointable();
-            item = new VoidPointable();
+            pointable = new VoidPointable();
             caster = new CastTypeEvaluator();
             depthArg = new TaggedValuePointable();
             orderedListBuilder = null;
@@ -146,11 +147,11 @@ public class ArrayFlattenDescriptor extends AbstractScalarFunctionDynamicDescrip
         @Override
         public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
             // 1st arg: list to flatten
-            listEval.evaluate(tuple, list);
+            listEval.evaluate(tuple, pointable);
             // 2nd arg: depthArg
             depthEval.evaluate(tuple, depthArg);
 
-            ATypeTag listType = ATYPETAGDESERIALIZER.deserialize(list.getByteArray()[list.getStartOffset()]);
+            ATypeTag listType = ATYPETAGDESERIALIZER.deserialize(pointable.getByteArray()[pointable.getStartOffset()]);
             if (!ATypeHierarchy.isCompatible(ATYPETAGDESERIALIZER.deserialize(depthArg.getTag()), ATypeTag.DOUBLE)
                     || !listType.isListType()) {
                 PointableHelper.setNull(result);
@@ -163,37 +164,41 @@ public class ArrayFlattenDescriptor extends AbstractScalarFunctionDynamicDescrip
                 return;
             }
 
-            caster.reset(DefaultOpenFieldType.getDefaultOpenFieldType(listType), inputListType, listEval);
-            caster.evaluate(tuple, list);
-
-            int depthInt = (int) depth;
-            // create list
-            IAsterixListBuilder listBuilder;
-            if (listType == ATypeTag.ARRAY) {
-                if (orderedListBuilder == null) {
-                    orderedListBuilder = new OrderedListBuilder();
-                }
-                listBuilder = orderedListBuilder;
-            } else {
-                if (unorderedListBuilder == null) {
-                    unorderedListBuilder = new UnorderedListBuilder();
+            try {
+                caster.resetAndAllocate(DefaultOpenFieldType.getDefaultOpenFieldType(listType), inputListType,
+                        listEval);
+                caster.cast(pointable, list);
+
+                int depthInt = (int) depth;
+                // create list
+                IAsterixListBuilder listBuilder;
+                if (listType == ATypeTag.ARRAY) {
+                    if (orderedListBuilder == null) {
+                        orderedListBuilder = new OrderedListBuilder();
+                    }
+                    listBuilder = orderedListBuilder;
+                } else {
+                    if (unorderedListBuilder == null) {
+                        unorderedListBuilder = new UnorderedListBuilder();
+                    }
+                    listBuilder = unorderedListBuilder;
                 }
-                listBuilder = unorderedListBuilder;
-            }
 
-            ListAccessor mainListAccessor = listAccessorAllocator.allocate(null);
-            listBuilder.reset((AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listType));
-            mainListAccessor.reset(list.getByteArray(), list.getStartOffset());
-            try {
+                ListAccessor mainListAccessor = listAccessorAllocator.allocate(null);
+                listBuilder.reset((AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listType));
+                mainListAccessor.reset(list.getByteArray(), list.getStartOffset());
+
+                storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
                 process(mainListAccessor, listBuilder, 0, depthInt);
-                storage.reset();
-                listBuilder.write(storage.getDataOutput(), true);
-                result.set(storage);
+                finalStorage.reset();
+                listBuilder.write(finalStorage.getDataOutput(), true);
+                result.set(finalStorage);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
             } finally {
                 storageAllocator.reset();
                 listAccessorAllocator.reset();
+                caster.deallocatePointables();
             }
         }
 
@@ -201,15 +206,15 @@ public class ArrayFlattenDescriptor extends AbstractScalarFunctionDynamicDescrip
                 throws IOException {
             boolean itemInStorage;
             for (int i = 0; i < listAccessor.size(); i++) {
-                itemInStorage = listAccessor.getOrWriteItem(i, item, storage);
+                itemInStorage = listAccessor.getOrWriteItem(i, pointable, storage);
                 // if item is not a list or depth is reached, write it
-                if (!ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isListType()
+                if (!ATYPETAGDESERIALIZER.deserialize(pointable.getByteArray()[pointable.getStartOffset()]).isListType()
                         || currentDepth == depth) {
-                    listBuilder.addItem(item);
+                    listBuilder.addItem(pointable);
                 } else {
                     // recurse on the sublist
                     ListAccessor newListAccessor = listAccessorAllocator.allocate(null);
-                    newListAccessor.reset(item.getByteArray(), item.getStartOffset());
+                    newListAccessor.reset(pointable.getByteArray(), pointable.getStartOffset());
                     if (itemInStorage) {
                         // create a new storage since the item is using it
                         storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
index 52335a0..85ba01f 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayIntersectDescriptor.java
@@ -21,11 +21,8 @@ package org.apache.asterix.runtime.evaluators.functions;
 import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.asterix.builders.AbvsBuilderFactory;
 import org.apache.asterix.builders.ArrayListFactory;
 import org.apache.asterix.builders.IAsterixListBuilder;
@@ -46,6 +43,7 @@ import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.AbstractCollectionType;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectFactory;
 import org.apache.asterix.om.util.container.IObjectPool;
 import org.apache.asterix.om.util.container.ListObjectPool;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
@@ -67,6 +65,9 @@ import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+
 /**
  * <pre>
  * array_intersect(list1, list2, ...) returns a new list containing items that are present in all of the input
@@ -101,11 +102,37 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
         }
     };
 
-    public class ValueListIndex implements IValueReference {
-        private final IPointable value;
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.ARRAY_INTERSECT;
+    }
+
+    @Override
+    public void setImmutableStates(Object... states) {
+        argTypes = (IAType[]) states;
+    }
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new IScalarEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException {
+                return new ArrayIntersectEval(args, ctx);
+            }
+        };
+    }
+
+    protected class ValueListIndex implements IValueReference {
+        private IPointable value;
         private int listIndex;
 
-        public ValueListIndex(IPointable value, int listIndex) {
+        protected ValueListIndex() {
+        }
+
+        protected void set(IPointable value, int listIndex) {
             this.value = value;
             this.listIndex = listIndex;
         }
@@ -126,31 +153,21 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
         }
     }
 
-    @Override
-    public FunctionIdentifier getIdentifier() {
-        return BuiltinFunctions.ARRAY_INTERSECT;
-    }
-
-    @Override
-    public void setImmutableStates(Object... states) {
-        argTypes = (IAType[]) states;
-    }
+    protected class ValueListIndexAllocator implements IObjectFactory<ValueListIndex, ATypeTag> {
 
-    @Override
-    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
-            throws AlgebricksException {
-        return new IScalarEvaluatorFactory() {
-            private static final long serialVersionUID = 1L;
+        protected ValueListIndexAllocator() {
+        }
 
-            @Override
-            public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException {
-                return new ArrayIntersectEval(args, ctx);
-            }
-        };
+        @Override
+        public ValueListIndex create(ATypeTag arg) {
+            return new ValueListIndex();
+        }
     }
 
     public class ArrayIntersectEval implements IScalarEvaluator {
         private final ListAccessor listAccessor;
+        private final IPointable pointable;
+        private final ArrayBackedValueStorage currentItemStorage;
         private final IPointable[] listsArgs;
         private final IScalarEvaluator[] listsEval;
         private final IBinaryHashFunction binaryHashFunction;
@@ -158,6 +175,7 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
         private final PointableAllocator pointableAllocator;
         private final IObjectPool<IMutableValueStorage, ATypeTag> storageAllocator;
         private final IObjectPool<List<ValueListIndex>, ATypeTag> arrayListAllocator;
+        private final IObjectPool<ValueListIndex, ATypeTag> valueListIndexAllocator;
         private final ArrayBackedValueStorage finalResult;
         private final CastTypeEvaluator caster;
         private final IBinaryComparator comp;
@@ -170,6 +188,7 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
             pointableAllocator = new PointableAllocator();
             storageAllocator = new ListObjectPool<>(new AbvsBuilderFactory());
             arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
+            valueListIndexAllocator = new ListObjectPool<>(new ValueListIndexAllocator());
             hashes = new Int2ObjectOpenHashMap<>();
             finalResult = new ArrayBackedValueStorage();
             listAccessor = new ListAccessor();
@@ -177,6 +196,8 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
             comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
             listsArgs = new IPointable[args.length];
             listsEval = new IScalarEvaluator[args.length];
+            pointable = new VoidPointable();
+            currentItemStorage = new ArrayBackedValueStorage();
             for (int i = 0; i < args.length; i++) {
                 listsArgs[i] = new VoidPointable();
                 listsEval[i] = args[i].createScalarEvaluator(ctx);
@@ -194,72 +215,70 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
             int minListIndex = 0;
             int minSize = -1;
             int nextSize;
-            IScalarEvaluator listEval;
-            IPointable listArg;
 
             // evaluate all the lists first to make sure they're all actually lists and of the same list type
-            for (int i = 0; i < listsEval.length; i++) {
-                listEval = listsEval[i];
-                listEval.evaluate(tuple, listsArgs[i]);
-                if (!returnNull) {
-                    listArg = listsArgs[i];
-                    listArgType = listArg.getByteArray()[listArg.getStartOffset()];
-                    listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
-                    if (!listTag.isListType()) {
-                        returnNull = true;
-                    } else if (outList != null && outList.getTypeTag() != listTag) {
-                        throw new RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc);
-                    } else {
-                        if (outList == null) {
-                            outList = (AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
-                        }
-
-                        caster.reset(outList, argTypes[i], listsEval[i]);
-                        caster.evaluate(tuple, listsArgs[i]);
-                        nextSize = getNumItems(outList, listArg.getByteArray(), listArg.getStartOffset());
-                        if (nextSize < minSize) {
-                            minSize = nextSize;
-                            minListIndex = i;
+            try {
+                for (int i = 0; i < listsEval.length; i++) {
+                    listsEval[i].evaluate(tuple, pointable);
+                    if (!returnNull) {
+                        listArgType = pointable.getByteArray()[pointable.getStartOffset()];
+                        listTag = ATYPETAGDESERIALIZER.deserialize(listArgType);
+                        if (!listTag.isListType()) {
+                            returnNull = true;
+                        } else if (outList != null && outList.getTypeTag() != listTag) {
+                            throw new RuntimeDataException(ErrorCode.DIFFERENT_LIST_TYPE_ARGS, sourceLoc);
+                        } else {
+                            if (outList == null) {
+                                outList =
+                                        (AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listTag);
+                            }
+
+                            caster.resetAndAllocate(outList, argTypes[i], listsEval[i]);
+                            caster.cast(pointable, listsArgs[i]);
+                            nextSize = getNumItems(outList, listsArgs[i].getByteArray(), listsArgs[i].getStartOffset());
+                            if (nextSize < minSize || minSize == -1) {
+                                minSize = nextSize;
+                                minListIndex = i;
+                            }
                         }
                     }
                 }
-            }
 
-            if (returnNull) {
-                PointableHelper.setNull(result);
-                return;
-            }
-
-            IAsterixListBuilder listBuilder;
-            if (outList.getTypeTag() == ATypeTag.ARRAY) {
-                if (orderedListBuilder == null) {
-                    orderedListBuilder = new OrderedListBuilder();
+                if (returnNull) {
+                    PointableHelper.setNull(result);
+                    return;
                 }
-                listBuilder = orderedListBuilder;
-            } else {
-                if (unorderedListBuilder == null) {
-                    unorderedListBuilder = new UnorderedListBuilder();
+
+                IAsterixListBuilder listBuilder;
+                if (outList.getTypeTag() == ATypeTag.ARRAY) {
+                    if (orderedListBuilder == null) {
+                        orderedListBuilder = new OrderedListBuilder();
+                    }
+                    listBuilder = orderedListBuilder;
+                } else {
+                    if (unorderedListBuilder == null) {
+                        unorderedListBuilder = new UnorderedListBuilder();
+                    }
+                    listBuilder = unorderedListBuilder;
                 }
-                listBuilder = unorderedListBuilder;
-            }
 
-            hashes.clear();
-            try {
-                // first, get distinct items of the most restrictive (smallest) list, pass listBuilder as null since
-                // we're not adding values yet. Values will be added to listBuilder after inspecting all input lists
+                IPointable listArg;
+                hashes.clear();
+
+                // first, get distinct items of the most restrictive (smallest) list.
+                // values will be added to listBuilder after inspecting all input lists
                 listArg = listsArgs[minListIndex];
                 listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset());
-                processList(listAccessor, minListIndex, null, true);
-
-                // now process each list one by one
+                buildRestrictiveList(listAccessor);
                 listBuilder.reset(outList);
-                for (int listIndex = 0; listIndex < listsArgs.length; listIndex++) {
-                    if (listIndex == minListIndex) {
-                        incrementSmallest(listIndex, hashes.values());
-                    } else {
+
+                if (!hashes.isEmpty()) {
+                    // process each list one by one
+                    for (int listIndex = 0; listIndex < listsArgs.length; listIndex++) {
+                        // TODO(ali): find a way to avoid comparing the smallest list
                         listArg = listsArgs[listIndex];
                         listAccessor.reset(listArg.getByteArray(), listArg.getStartOffset());
-                        processList(listAccessor, listIndex, listBuilder, false);
+                        processList(listAccessor, listIndex, listBuilder);
                     }
                 }
 
@@ -269,6 +288,8 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
             } finally {
+                caster.deallocatePointables();
+                valueListIndexAllocator.reset();
                 storageAllocator.reset();
                 arrayListAllocator.reset();
                 pointableAllocator.reset();
@@ -283,57 +304,75 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
             }
         }
 
-        private void processList(ListAccessor listAccessor, int listIndex, IAsterixListBuilder listBuilder,
-                boolean initIntersectList) throws IOException {
+        // puts all the items of the smallest list in "hashes"
+        private void buildRestrictiveList(ListAccessor listAccessor) throws IOException {
+            if (listAccessor.size() > 0) {
+                int hash;
+                List<ValueListIndex> sameHashes;
+                boolean itemInStorage;
+                IPointable item = pointableAllocator.allocateEmpty();
+                ArrayBackedValueStorage storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
+                storage.reset();
+                for (int j = 0; j < listAccessor.size(); j++) {
+                    itemInStorage = listAccessor.getOrWriteItem(j, item, storage);
+                    validateItem(item);
+                    if (notNullAndMissing(item)) {
+                        hash = binaryHashFunction.hash(item.getByteArray(), item.getStartOffset(), item.getLength());
+                        sameHashes = hashes.get(hash);
+                        if (addToSmallestList(item, hash, sameHashes)) {
+                            // item has been added to intersect list and is being used, allocate new pointable
+                            item = pointableAllocator.allocateEmpty();
+                            if (itemInStorage) {
+                                storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
+                                storage.reset();
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+        private void processList(ListAccessor listAccessor, int listIndex, IAsterixListBuilder listBuilder)
+                throws IOException {
             int hash;
             List<ValueListIndex> sameHashes;
-            boolean itemInStorage;
-            IPointable item = pointableAllocator.allocateEmpty();
-            ArrayBackedValueStorage storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
-            storage.reset();
             for (int j = 0; j < listAccessor.size(); j++) {
-                itemInStorage = listAccessor.getOrWriteItem(j, item, storage);
-                if (ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType()) {
-                    throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLoc);
-                }
-                if (notNullAndMissing(item)) {
-                    // look up to see if item exists
-                    hash = binaryHashFunction.hash(item.getByteArray(), item.getStartOffset(), item.getLength());
+                listAccessor.getOrWriteItem(j, pointable, currentItemStorage);
+                validateItem(pointable);
+                if (notNullAndMissing(pointable)) {
+                    // hash the item and look up to see if it is common
+                    hash = binaryHashFunction.hash(pointable.getByteArray(), pointable.getStartOffset(),
+                            pointable.getLength());
                     sameHashes = hashes.get(hash);
-                    if (initIntersectList && initIntersectList(item, hash, sameHashes)) {
-                        // item is used
-                        item = pointableAllocator.allocateEmpty();
-                        if (itemInStorage) {
-                            storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
-                            storage.reset();
-                        }
-                    } else {
-                        incrementCommonValue(item, sameHashes, listIndex, listBuilder);
-                    }
+                    incrementIfCommonValue(pointable, sameHashes, listIndex, listBuilder);
                 }
             }
         }
 
-        // collect the items of the most restrictive list, it initializes the list index as -1. each successive list
+        // collects the items of the most restrictive list, it initializes the list index as -1. each successive list
         // should stamp the value with its list index if the list has the item. It starts with list index = 0
-        private boolean initIntersectList(IPointable item, int hash, List<ValueListIndex> sameHashes)
+        private boolean addToSmallestList(IPointable item, int hash, List<ValueListIndex> sameHashes)
                 throws IOException {
             // add if new item
             if (sameHashes == null) {
                 List<ValueListIndex> newHashes = arrayListAllocator.allocate(null);
                 newHashes.clear();
-                newHashes.add(new ValueListIndex(item, -1));
+                ValueListIndex valueListIndex = valueListIndexAllocator.allocate(null);
+                valueListIndex.set(item, -1);
+                newHashes.add(valueListIndex);
                 hashes.put(hash, newHashes);
                 return true;
             } else if (ArrayFunctionsUtil.findItem(item, sameHashes, comp) == null) {
-                sameHashes.add(new ValueListIndex(item, -1));
+                ValueListIndex valueListIndex = valueListIndexAllocator.allocate(null);
+                valueListIndex.set(item, -1);
+                sameHashes.add(valueListIndex);
                 return true;
             }
             // else ignore for duplicate values in the same list
             return false;
         }
 
-        private void incrementCommonValue(IPointable item, List<ValueListIndex> sameHashes, int listIndex,
+        private void incrementIfCommonValue(IPointable item, List<ValueListIndex> sameHashes, int listIndex,
                 IAsterixListBuilder listBuilder) throws IOException {
             if (sameHashes != null) {
                 // look for the same equal item, add to list builder when all lists have seen this item
@@ -346,33 +385,25 @@ public class ArrayIntersectDescriptor extends AbstractScalarFunctionDynamicDescr
             return tag != ATypeTag.SERIALIZED_NULL_TYPE_TAG && tag != ATypeTag.SERIALIZED_MISSING_TYPE_TAG;
         }
 
-        // this method is only for the most restrictive list. it avoids comparison since it is the initial list we start
-        // with, so for sure every element in the collection must exist in the list
-        private void incrementSmallest(int listIndex, Collection<List<ValueListIndex>> commonValues) {
-            for (List<ValueListIndex> items : commonValues) {
-                for (int i = 0; i < items.size(); i++) {
-                    // any difference that is not == 1 means either this current list has already stamped and advanced
-                    // the stamp or the item is not common among lists because if it's common then each list should've
-                    // incremented the item list index up to the current list index
-                    if (listIndex - items.get(i).listIndex == 1) {
-                        items.get(i).listIndex = listIndex;
-                    }
-                }
-            }
-        }
-
         private void incrementIfExists(List<ValueListIndex> sameHashes, IPointable item, int listIndex,
                 IAsterixListBuilder listBuilder) throws HyracksDataException {
             ValueListIndex sameValue = ArrayFunctionsUtil.findItem(item, sameHashes, comp);
             if (sameValue != null && listIndex - sameValue.listIndex == 1) {
-                // found the item, its stamp is OK (stamp saves the last list index that has seen this item)
+                // found the item, its stamp is OK (stamp saves the index of the last list that has seen this item)
                 // increment stamp of this item
                 sameValue.listIndex = listIndex;
                 if (listIndex == listsArgs.length - 1) {
-                    // when listIndex is the last list, then it means this item was found in all previous lists
+                    // if this list is the last to stamp, then add to the final result
                     listBuilder.addItem(item);
                 }
             }
         }
+
+        // validates that the item is not derived, multisets, objects and arrays are not yet supported
+        private void validateItem(IPointable item) throws RuntimeDataException {
+            if (ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType()) {
+                throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLoc);
+            }
+        }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java
index 86321da..571fb4c 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayPutDescriptor.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
@@ -101,12 +102,16 @@ public class ArrayPutDescriptor extends AbstractScalarFunctionDynamicDescriptor
 
     public class ArrayPutEval extends AbstractArrayAddRemoveEval {
         private final ArrayBackedValueStorage storage;
+        private final IPointable item;
         private final IBinaryComparator comp;
+        private final boolean[] add;
 
         public ArrayPutEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
             super(args, ctx, 0, 1, args.length - 1, argTypes, true, sourceLoc, true, false);
             comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
             storage = new ArrayBackedValueStorage();
+            item = new VoidPointable();
+            add = new boolean[args.length - 1];
         }
 
         @Override
@@ -125,28 +130,32 @@ public class ArrayPutDescriptor extends AbstractScalarFunctionDynamicDescriptor
         @Override
         protected void processList(ListAccessor listAccessor, IAsterixListBuilder listBuilder, IPointable[] values,
                 int position) throws IOException {
-            boolean[] dontAdd = new boolean[values.length];
+            markAllToBeAdded();
             // get the list items one by one and append to the new list
             for (int i = 0; i < listAccessor.size(); i++) {
-                storage.reset();
-                listAccessor.writeItem(i, storage.getDataOutput());
-                listBuilder.addItem(storage);
+                listAccessor.getOrWriteItem(i, item, storage);
+                listBuilder.addItem(item);
                 // mark the equal values to skip adding them
                 for (int j = 0; j < values.length; j++) {
-                    if (!dontAdd[j]
-                            && comp.compare(storage.getByteArray(), storage.getStartOffset(), storage.getLength(),
-                                    values[j].getByteArray(), values[j].getStartOffset(), values[j].getLength()) == 0) {
-                        dontAdd[j] = true;
+                    if (add[j] && comp.compare(item.getByteArray(), item.getStartOffset(), item.getLength(),
+                            values[j].getByteArray(), values[j].getStartOffset(), values[j].getLength()) == 0) {
+                        add[j] = false;
                     }
                     // skip comparison if the value is already marked
                 }
             }
             // append the values arguments only if they are not already present in the list, i.e. not marked
             for (int i = 0; i < values.length; i++) {
-                if (!dontAdd[i]) {
+                if (add[i]) {
                     listBuilder.addItem(values[i]);
                 }
             }
         }
+
+        private void markAllToBeAdded() {
+            for (int i = 0; i < add.length; i++) {
+                add[i] = true;
+            }
+        }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java
index a5afa5f..79288e1 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayRemoveDescriptor.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
@@ -98,11 +99,13 @@ public class ArrayRemoveDescriptor extends AbstractScalarFunctionDynamicDescript
 
     public class ArrayRemoveEval extends AbstractArrayAddRemoveEval {
         private final ArrayBackedValueStorage storage;
+        private final IPointable item;
         private final IBinaryComparator comp;
 
         public ArrayRemoveEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
             super(args, ctx, 0, 1, args.length - 1, argTypes, true, sourceLoc, false, false);
             storage = new ArrayBackedValueStorage();
+            item = new VoidPointable();
             comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
         }
 
@@ -118,18 +121,17 @@ public class ArrayRemoveDescriptor extends AbstractScalarFunctionDynamicDescript
             // get the list items one by one and append to the new list only if the list item is not in removed list
             boolean addItem;
             for (int i = 0; i < listAccessor.size(); i++) {
-                storage.reset();
-                listAccessor.writeItem(i, storage.getDataOutput());
+                listAccessor.getOrWriteItem(i, item, storage);
                 addItem = true;
                 for (int j = 0; j < removed.length; j++) {
-                    if (comp.compare(storage.getByteArray(), storage.getStartOffset(), storage.getLength(),
+                    if (comp.compare(item.getByteArray(), item.getStartOffset(), item.getLength(),
                             removed[j].getByteArray(), removed[j].getStartOffset(), removed[j].getLength()) == 0) {
                         addItem = false;
                         break;
                     }
                 }
                 if (addItem) {
-                    listBuilder.addItem(storage);
+                    listBuilder.addItem(item);
                 }
             }
         }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
index 032ef32..f591b54 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayReplaceDescriptor.java
@@ -122,8 +122,10 @@ public class ArrayReplaceDescriptor extends AbstractScalarFunctionDynamicDescrip
         private final IScalarEvaluator newValEval;
         private IScalarEvaluator maxEval;
         private final IPointable list;
+        private final IPointable tempList;
         private final IPointable target;
         private final IPointable newVal;
+        private final IPointable tempVal;
         private TaggedValuePointable maxArg;
         private final AbstractPointable item;
         private final ListAccessor listAccessor;
@@ -143,8 +145,10 @@ public class ArrayReplaceDescriptor extends AbstractScalarFunctionDynamicDescrip
                 maxArg = new TaggedValuePointable();
             }
             list = new VoidPointable();
+            tempList = new VoidPointable();
             target = new VoidPointable();
             newVal = new VoidPointable();
+            tempVal = new VoidPointable();
             item = new VoidPointable();
             listAccessor = new ListAccessor();
             caster = new CastTypeEvaluator();
@@ -156,12 +160,12 @@ public class ArrayReplaceDescriptor extends AbstractScalarFunctionDynamicDescrip
         @Override
         public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
             storage.reset();
-            listEval.evaluate(tuple, list);
+            listEval.evaluate(tuple, tempList);
             targetValEval.evaluate(tuple, target);
-            newValEval.evaluate(tuple, newVal);
-            ATypeTag listType = ATYPETAGDESERIALIZER.deserialize(list.getByteArray()[list.getStartOffset()]);
+            newValEval.evaluate(tuple, tempVal);
+            ATypeTag listType = ATYPETAGDESERIALIZER.deserialize(tempList.getByteArray()[tempList.getStartOffset()]);
             ATypeTag targetTag = ATYPETAGDESERIALIZER.deserialize(target.getByteArray()[target.getStartOffset()]);
-            ATypeTag newValTag = ATYPETAGDESERIALIZER.deserialize(newVal.getByteArray()[newVal.getStartOffset()]);
+            ATypeTag newValTag = ATYPETAGDESERIALIZER.deserialize(tempVal.getByteArray()[tempVal.getStartOffset()]);
             if (listType == ATypeTag.MISSING || targetTag == ATypeTag.MISSING || newValTag == ATypeTag.MISSING) {
                 PointableHelper.setMissing(result);
                 return;
@@ -192,34 +196,37 @@ public class ArrayReplaceDescriptor extends AbstractScalarFunctionDynamicDescrip
                 throw new RuntimeDataException(ErrorCode.CANNOT_COMPARE_COMPLEX, sourceLoc);
             }
 
-            IAType defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(listType);
-            caster.reset(defaultOpenType, inputListType, listEval);
-            caster.evaluate(tuple, list);
-
-            defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(newValTag);
-            if (defaultOpenType != null) {
-                caster.reset(defaultOpenType, newValueType, newValEval);
-                caster.evaluate(tuple, newVal);
-            }
-
-            int max = (int) maxDouble;
-            // create list
-            IAsterixListBuilder listBuilder;
-            if (listType == ATypeTag.ARRAY) {
-                if (orderedListBuilder == null) {
-                    orderedListBuilder = new OrderedListBuilder();
+            try {
+                IAType defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(listType);
+                caster.resetAndAllocate(defaultOpenType, inputListType, listEval);
+                caster.cast(tempList, list);
+
+                defaultOpenType = DefaultOpenFieldType.getDefaultOpenFieldType(newValTag);
+                if (defaultOpenType != null) {
+                    caster.resetAndAllocate(defaultOpenType, newValueType, newValEval);
+                    caster.cast(tempVal, newVal);
+                } else {
+                    newVal.set(tempVal);
                 }
-                listBuilder = orderedListBuilder;
-            } else {
-                if (unorderedListBuilder == null) {
-                    unorderedListBuilder = new UnorderedListBuilder();
+
+                int max = (int) maxDouble;
+                // create list
+                IAsterixListBuilder listBuilder;
+                if (listType == ATypeTag.ARRAY) {
+                    if (orderedListBuilder == null) {
+                        orderedListBuilder = new OrderedListBuilder();
+                    }
+                    listBuilder = orderedListBuilder;
+                } else {
+                    if (unorderedListBuilder == null) {
+                        unorderedListBuilder = new UnorderedListBuilder();
+                    }
+                    listBuilder = unorderedListBuilder;
                 }
-                listBuilder = unorderedListBuilder;
-            }
 
-            listBuilder.reset((AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listType));
-            listAccessor.reset(list.getByteArray(), list.getStartOffset());
-            try {
+                listBuilder.reset((AbstractCollectionType) DefaultOpenFieldType.getDefaultOpenFieldType(listType));
+                listAccessor.reset(list.getByteArray(), list.getStartOffset());
+
                 int counter = 0;
                 byte[] targetBytes = target.getByteArray();
                 int offset = target.getStartOffset();
@@ -239,6 +246,8 @@ public class ArrayReplaceDescriptor extends AbstractScalarFunctionDynamicDescrip
                 result.set(storage);
             } catch (IOException e) {
                 throw HyracksDataException.create(e);
+            } finally {
+                caster.deallocatePointables();
             }
         }
     }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
index 1d8db47..b2cc7ba 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySortDescriptor.java
@@ -127,8 +127,6 @@ public class ArraySortDescriptor extends AbstractScalarFunctionDynamicDescriptor
                 throws HyracksDataException {
             super(args, ctx, inputListType);
             this.sourceLoc = sourceLoc;
-            item = pointableAllocator.allocateEmpty();
-            storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
             sortedList = new PriorityQueue<>(new ArraySortComparator());
         }
 
@@ -136,6 +134,8 @@ public class ArraySortDescriptor extends AbstractScalarFunctionDynamicDescriptor
         protected void processList(ListAccessor listAccessor, IAsterixListBuilder listBuilder) throws IOException {
             sortedList.clear();
             boolean itemInStorage;
+            item = pointableAllocator.allocateEmpty();
+            storage = (ArrayBackedValueStorage) storageAllocator.allocate(null);
             for (int i = 0; i < listAccessor.size(); i++) {
                 itemInStorage = listAccessor.getOrWriteItem(i, item, storage);
                 if (ATYPETAGDESERIALIZER.deserialize(item.getByteArray()[item.getStartOffset()]).isDerivedType()) {
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
index 4bc885c..a247639 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArrayStarDescriptor.java
@@ -21,11 +21,12 @@ package org.apache.asterix.runtime.evaluators.functions;
 import static org.apache.asterix.om.types.EnumDeserializer.ATYPETAGDESERIALIZER;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import java.util.PriorityQueue;
 
+import org.apache.asterix.builders.ArrayListFactory;
 import org.apache.asterix.builders.IAsterixListBuilder;
 import org.apache.asterix.builders.OrderedListBuilder;
 import org.apache.asterix.builders.RecordBuilder;
@@ -39,6 +40,9 @@ import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.pointables.base.IVisitablePointable;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
 import org.apache.asterix.runtime.evaluators.common.ListAccessor;
 import org.apache.asterix.runtime.functions.FunctionTypeInferers;
@@ -50,6 +54,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
@@ -123,11 +128,11 @@ public class ArrayStarDescriptor extends AbstractScalarFunctionDynamicDescriptor
         };
     }
 
-    public class UTF8StringComparator implements Comparator<IVisitablePointable> {
+    public class UTF8StringComparator implements Comparator<IValueReference> {
         private final IBinaryComparator comp = PointableHelper.createStringBinaryComparator();
 
         @Override
-        public int compare(IVisitablePointable val1, IVisitablePointable val2) {
+        public int compare(IValueReference val1, IValueReference val2) {
             try {
                 return PointableHelper.compareStringBinValues(val1, val2, comp);
             } catch (HyracksDataException e) {
@@ -136,74 +141,119 @@ public class ArrayStarDescriptor extends AbstractScalarFunctionDynamicDescriptor
         }
     }
 
+    protected class FieldNameToValues implements IValueReference {
+        private IVisitablePointable fieldName;
+        private List<IVisitablePointable> values;
+
+        @Override
+        public byte[] getByteArray() {
+            return fieldName.getByteArray();
+        }
+
+        @Override
+        public int getStartOffset() {
+            return fieldName.getStartOffset();
+        }
+
+        @Override
+        public int getLength() {
+            return fieldName.getLength();
+        }
+    }
+
+    protected class FieldNameToValuesAllocator implements IObjectFactory<FieldNameToValues, ATypeTag> {
+
+        @Override
+        public FieldNameToValues create(ATypeTag arg) {
+            return new FieldNameToValues();
+        }
+    }
+
     public class ArrayStarEval implements IScalarEvaluator {
+        private final IBinaryComparator binaryStrComp = PointableHelper.createStringBinaryComparator();
         private final UTF8StringComparator comp = new UTF8StringComparator();
         private final ArrayBackedValueStorage storage;
         private final IScalarEvaluator listEval;
         private final IPointable list;
+        private final IPointable tempList;
         private final IPointable object;
         private final CastTypeEvaluator caster;
         private final ListAccessor listAccessor;
-        private final TreeMap<IVisitablePointable, IVisitablePointable[]> fieldNameToValues;
         private final RecordBuilder recordBuilder;
         private final IAsterixListBuilder listBuilder;
         private final PointableAllocator pointableAllocator;
+        private final List<FieldNameToValues> fieldNameToValuesList;
+        private final PriorityQueue<FieldNameToValues> tempMinHeap;
+        private final IObjectPool<List<IVisitablePointable>, ATypeTag> arrayListAllocator;
+        private final IObjectPool<FieldNameToValues, ATypeTag> fieldNameToValuesAllocator;
 
         public ArrayStarEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx) throws HyracksDataException {
             storage = new ArrayBackedValueStorage();
             object = new VoidPointable();
             list = new VoidPointable();
+            tempList = new VoidPointable();
             listEval = args[0].createScalarEvaluator(ctx);
             caster = new CastTypeEvaluator();
             listAccessor = new ListAccessor();
-            fieldNameToValues = new TreeMap<>(comp);
             recordBuilder = new RecordBuilder();
             listBuilder = new OrderedListBuilder();
             pointableAllocator = new PointableAllocator();
+            fieldNameToValuesList = new ArrayList<>();
+            tempMinHeap = new PriorityQueue<>(comp);
+            arrayListAllocator = new ListObjectPool<>(new ArrayListFactory<>());
+            fieldNameToValuesAllocator = new ListObjectPool<>(new FieldNameToValuesAllocator());
         }
 
         @Override
         public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException {
             storage.reset();
-            listEval.evaluate(tuple, list);
-            ATypeTag listTag = ATYPETAGDESERIALIZER.deserialize(list.getByteArray()[list.getStartOffset()]);
+            listEval.evaluate(tuple, tempList);
+            ATypeTag listTag = ATYPETAGDESERIALIZER.deserialize(tempList.getByteArray()[tempList.getStartOffset()]);
             if (listTag != ATypeTag.ARRAY) {
                 PointableHelper.setNull(result);
                 return;
             }
 
-            caster.reset(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE, inputListType, listEval);
-            caster.evaluate(tuple, list);
-
-            fieldNameToValues.clear();
-            listAccessor.reset(list.getByteArray(), list.getStartOffset());
-            int numObjects = listAccessor.size();
             try {
+                caster.resetAndAllocate(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE, inputListType, listEval);
+                caster.cast(tempList, list);
+
+                tempMinHeap.clear();
+                fieldNameToValuesList.clear();
+                listAccessor.reset(list.getByteArray(), list.getStartOffset());
+                int numObjects = listAccessor.size();
+
                 for (int objectIndex = 0; objectIndex < numObjects; objectIndex++) {
                     listAccessor.getOrWriteItem(objectIndex, object, storage);
                     processObject(object, objectIndex, numObjects);
                 }
 
-                if (fieldNameToValues.isEmpty()) {
+                if (fieldNameToValuesList.isEmpty()) {
                     PointableHelper.setMissing(result);
                     return;
                 }
-
+                for (int i = 0; i < fieldNameToValuesList.size(); i++) {
+                    tempMinHeap.add(fieldNameToValuesList.get(i));
+                }
                 recordBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
                 recordBuilder.init();
 
-                for (Map.Entry<IVisitablePointable, IVisitablePointable[]> e : fieldNameToValues.entrySet()) {
+                FieldNameToValues fieldNameToValues;
+                IVisitablePointable oneValue;
+                while (!tempMinHeap.isEmpty()) {
+                    fieldNameToValues = tempMinHeap.poll();
                     listBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
-                    for (int i = 0; i < e.getValue().length; i++) {
-                        if (e.getValue()[i] == null) {
+                    for (int k = 0; k < fieldNameToValues.values.size(); k++) {
+                        oneValue = fieldNameToValues.values.get(k);
+                        if (oneValue == null) {
                             listBuilder.addItem(PointableHelper.NULL_REF);
                         } else {
-                            listBuilder.addItem(e.getValue()[i]);
+                            listBuilder.addItem(oneValue);
                         }
                     }
                     storage.reset();
                     listBuilder.write(storage.getDataOutput(), true);
-                    recordBuilder.addField(e.getKey(), storage);
+                    recordBuilder.addField(fieldNameToValues.fieldName, storage);
                 }
 
                 storage.reset();
@@ -213,10 +263,13 @@ public class ArrayStarDescriptor extends AbstractScalarFunctionDynamicDescriptor
                 throw HyracksDataException.create(e);
             } finally {
                 pointableAllocator.reset();
+                arrayListAllocator.reset();
+                fieldNameToValuesAllocator.reset();
+                caster.deallocatePointables();
             }
         }
 
-        private void processObject(IPointable object, int objectIndex, int numObjects) {
+        private void processObject(IPointable object, int objectIndex, int numObjects) throws HyracksDataException {
             ARecordVisitablePointable record;
             // process only objects (records)
             if (object.getByteArray()[object.getStartOffset()] == ATypeTag.SERIALIZED_RECORD_TYPE_TAG) {
@@ -225,16 +278,46 @@ public class ArrayStarDescriptor extends AbstractScalarFunctionDynamicDescriptor
 
                 List<IVisitablePointable> fieldNames = record.getFieldNames();
                 List<IVisitablePointable> fieldValues = record.getFieldValues();
-                IVisitablePointable[] values;
+                List<IVisitablePointable> values;
+                IVisitablePointable fieldName;
                 for (int j = 0; j < fieldNames.size(); j++) {
-                    values = fieldNameToValues.get(fieldNames.get(j));
-                    if (values == null) {
-                        values = new IVisitablePointable[numObjects];
-                        fieldNameToValues.put(fieldNames.get(j), values);
+                    fieldName = fieldNames.get(j);
+                    FieldNameToValues fieldNameToValues = findField(fieldName, fieldNameToValuesList, binaryStrComp);
+
+                    if (fieldNameToValues == null) {
+                        // new field name
+                        fieldNameToValues = fieldNameToValuesAllocator.allocate(null);
+                        values = arrayListAllocator.allocate(null);
+                        clear(values, numObjects);
+                        fieldNameToValues.fieldName = fieldName;
+                        fieldNameToValues.values = values;
+                        fieldNameToValuesList.add(fieldNameToValues);
+                    } else {
+                        // field name already exists, get the values vector
+                        values = fieldNameToValues.values;
                     }
-                    values[objectIndex] = fieldValues.get(j);
+                    values.set(objectIndex, fieldValues.get(j));
+                }
+            }
+        }
+
+        private void clear(List<IVisitablePointable> values, int numObjects) {
+            values.clear();
+            for (int i = 1; i <= numObjects; i++) {
+                values.add(null);
+            }
+        }
+
+        private FieldNameToValues findField(IVisitablePointable fieldName, List<FieldNameToValues> fieldNamesList,
+                IBinaryComparator strComp) throws HyracksDataException {
+            FieldNameToValues anotherFieldName;
+            for (int i = 0; i < fieldNamesList.size(); i++) {
+                anotherFieldName = fieldNamesList.get(i);
+                if (PointableHelper.isEqual(fieldName, anotherFieldName.fieldName, strComp)) {
+                    return anotherFieldName;
                 }
             }
+            return null;
         }
     }
 }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
index 45b9fdc..d649c48 100755
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/ArraySymDiffEval.java
@@ -20,8 +20,6 @@ package org.apache.asterix.runtime.evaluators.functions;
 
 import java.util.List;
 
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 import org.apache.asterix.builders.ArrayListFactory;
 import org.apache.asterix.builders.IAsterixListBuilder;
 import org.apache.asterix.dataflow.data.nontagged.comparators.AObjectAscBinaryComparatorFactory;
@@ -40,6 +38,10 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.storage.common.arraylist.IntArrayList;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 
 public class ArraySymDiffEval extends AbstractArrayProcessArraysEval {
     private final IBinaryHashFunction binaryHashFunction;
@@ -47,6 +49,7 @@ public class ArraySymDiffEval extends AbstractArrayProcessArraysEval {
     private final IObjectPool<List<ValueCounter>, ATypeTag> arrayListAllocator;
     private final IObjectPool<ValueCounter, ATypeTag> valueCounterAllocator;
     private final IBinaryComparator comp;
+    private final IntArrayList intHashes;
 
     public ArraySymDiffEval(IScalarEvaluatorFactory[] args, IHyracksTaskContext ctx, SourceLocation sourceLocation,
             IAType[] argTypes) throws HyracksDataException {
@@ -55,6 +58,7 @@ public class ArraySymDiffEval extends AbstractArrayProcessArraysEval {
         valueCounterAllocator = new ListObjectPool<>(new ValueCounterFactory());
         hashes = new Int2ObjectOpenHashMap<>();
         comp = AObjectAscBinaryComparatorFactory.INSTANCE.createBinaryComparator();
+        intHashes = new IntArrayList(50, 10);
         binaryHashFunction = BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(null)
                 .createBinaryHashFunction();
     }
@@ -100,14 +104,18 @@ public class ArraySymDiffEval extends AbstractArrayProcessArraysEval {
     @Override
     protected void init() {
         hashes.clear();
+        intHashes.clear();
     }
 
     @Override
     protected void finish(IAsterixListBuilder listBuilder) throws HyracksDataException {
         ValueCounter item;
-        for (List<ValueCounter> entry : hashes.values()) {
-            for (int i = 0; i < entry.size(); i++) {
-                item = entry.get(i);
+        List<ValueCounter> items;
+        // TODO(ali): temp solution to avoid iterator object creation, find a better way
+        for (int i = 0; i < intHashes.size(); i++) {
+            items = hashes.get(intHashes.get(i));
+            for (int k = 0; k < items.size(); k++) {
+                item = items.get(k);
                 if (checkCounter(item.counter)) {
                     listBuilder.addItem(item.value);
                 }
@@ -137,12 +145,13 @@ public class ArraySymDiffEval extends AbstractArrayProcessArraysEval {
             sameHashes.clear();
             addItem(item, listIndex, sameHashes);
             hashes.put(hash, sameHashes);
+            intHashes.add(hash);
             return true;
         } else {
             // potentially, item already exists
             ValueCounter itemListIdxCounter = ArrayFunctionsUtil.findItem(item, sameHashes, comp);
             if (itemListIdxCounter == null) {
-                // new item
+                // new item having the same hash as a different item
                 addItem(item, listIndex, sameHashes);
                 return true;
             }
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
index d1879b2..b9f693b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/CastTypeEvaluator.java
@@ -48,10 +48,10 @@ public class CastTypeEvaluator implements IScalarEvaluator {
     }
 
     public CastTypeEvaluator(IAType reqType, IAType inputType, IScalarEvaluator argEvaluator) {
-        reset(reqType, inputType, argEvaluator);
+        resetAndAllocate(reqType, inputType, argEvaluator);
     }
 
-    public void reset(IAType reqType, IAType inputType, IScalarEvaluator argEvaluator) {
+    public void resetAndAllocate(IAType reqType, IAType inputType, IScalarEvaluator argEvaluator) {
         this.argEvaluator = argEvaluator;
         this.inputPointable = allocatePointable(inputType, reqType);
         this.resultPointable = allocatePointable(reqType, inputType);
@@ -76,6 +76,12 @@ public class CastTypeEvaluator implements IScalarEvaluator {
         result.set(resultPointable);
     }
 
+    // TODO: refactor in a better way
+    protected void cast(IPointable argPointable, IPointable result) throws HyracksDataException {
+        inputPointable.set(argPointable);
+        cast(result);
+    }
+
     // Allocates the result pointable.
     private IVisitablePointable allocatePointable(IAType typeForPointable, IAType typeForOtherSide) {
         if (!typeForPointable.equals(BuiltinType.ANY)) {
@@ -98,4 +104,8 @@ public class CastTypeEvaluator implements IScalarEvaluator {
                 return allocator.allocateFieldValue(null);
         }
     }
+
+    public void deallocatePointables() {
+        allocator.reset();
+    }
 }


[asterixdb] 01/03: [NO ISSUE][NET] Catch All Network Unexpected Exceptions

Posted by mh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 83f2efa367c20bc0a2bb406859fce2a8a24b9872
Author: Murtadha Hubail <mh...@apache.org>
AuthorDate: Fri Jan 25 00:18:58 2019 +0300

    [NO ISSUE][NET] Catch All Network Unexpected Exceptions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Any case of any unexpected exception during
      IPC network operations, close the connection
      to allow it to be reestablished.
    
    Change-Id: I57db83faa1d1ecbc4702ca06e64e21fedb186313
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3132
    Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Michael Blow <mb...@apache.org>
---
 .../java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 9ef506e..b4828e9 100644
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@ -232,7 +232,7 @@ public class IPCConnectionManager {
                     connectableKey.interestOps(SelectionKey.OP_READ);
                     connectionEstablished(handle);
                 }
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOGGER.warn("Exception finishing connect", e);
             } finally {
                 if (!connected) {
@@ -253,7 +253,7 @@ public class IPCConnectionManager {
                 handle.setKey(channelKey);
                 channelKey.attach(handle);
                 handle.setState(HandleState.CONNECT_RECEIVED);
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOGGER.error("Failed to accept channel ", e);
                 close(channelKey, channel);
             }
@@ -274,7 +274,7 @@ public class IPCConnectionManager {
                     }
                     handle.setKey(channelKey);
                     channelKey.attach(handle);
-                } catch (IOException e) {
+                } catch (Exception e) {
                     LOGGER.error("Failed to accept channel ", e);
                     close(channelKey, channel);
                     handle.setState(HandleState.CLOSED);
@@ -377,7 +377,7 @@ public class IPCConnectionManager {
                 if (!readBuffer.hasRemaining()) {
                     handle.resizeInBuffer();
                 }
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOGGER.error("TCP read error from {}", handle.getRemoteAddress(), e);
                 close(readableKey, channel);
             }
@@ -401,7 +401,7 @@ public class IPCConnectionManager {
                     handle.clearFull();
                     selector.wakeup();
                 }
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOGGER.error("TCP write error to {}", handle.getRemoteAddress(), e);
                 close(writableKey, channel);
             }


[asterixdb] 03/03: Merge commit '04aebfe' from 'stabilization-f69489' into 'master'

Posted by mh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 0615d611330786889b2ee2df376fcc98bec8fa28
Merge: 4e1773f 04aebfe
Author: Michael Blow <mb...@apache.org>
AuthorDate: Sun Feb 3 00:21:39 2019 -0500

    Merge commit '04aebfe' from 'stabilization-f69489' into 'master'
    
    Change-Id: I2d6ed58a6f11efff839bacab4bbe552d92eb9371

 .../array_intersect/array_intersect.3.query.sqlpp  |   5 +-
 .../array_intersect/array_intersect.3.adm          |   2 +-
 .../array_fun/array_symdiff/array_symdiff.3.adm    |   2 +-
 .../array_fun/array_symdiffn/array_symdiffn.3.adm  |   2 +-
 .../functions/AbstractArrayAddRemoveEval.java      | 137 +++++-----
 .../functions/AbstractArrayProcessArraysEval.java  |  71 +++---
 .../functions/AbstractArrayProcessEval.java        |  12 +-
 .../functions/AbstractArraySearchEval.java         |   9 +-
 .../functions/ArrayDistinctDescriptor.java         |   4 +-
 .../functions/ArrayFlattenDescriptor.java          |  69 +++---
 .../functions/ArrayIntersectDescriptor.java        | 275 ++++++++++++---------
 .../evaluators/functions/ArrayPutDescriptor.java   |  27 +-
 .../functions/ArrayRemoveDescriptor.java           |  10 +-
 .../functions/ArrayReplaceDescriptor.java          |  67 ++---
 .../evaluators/functions/ArraySortDescriptor.java  |   4 +-
 .../evaluators/functions/ArrayStarDescriptor.java  | 139 ++++++++---
 .../evaluators/functions/ArraySymDiffEval.java     |  21 +-
 .../evaluators/functions/CastTypeEvaluator.java    |  14 +-
 .../hyracks/ipc/impl/IPCConnectionManager.java     |  10 +-
 19 files changed, 529 insertions(+), 351 deletions(-)

diff --cc hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
index 764ec7d,b4828e9..64881dc
--- a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCConnectionManager.java
@@@ -236,15 -229,10 +236,15 @@@ public class IPCConnectionManager 
              try {
                  connected = channel.finishConnect();
                  if (connected) {
 -                    connectableKey.interestOps(SelectionKey.OP_READ);
 -                    connectionEstablished(handle);
 +                    SelectionKey channelKey = channel.register(selector, SelectionKey.OP_READ);
 +                    final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(channel);
 +                    if (clientChannel.requiresHandshake()) {
 +                        asyncHandshake(clientChannel, handle, channelKey);
 +                    } else {
 +                        connectionEstablished(handle, channelKey, clientChannel);
 +                    }
                  }
-             } catch (IOException e) {
+             } catch (Exception e) {
                  LOGGER.warn("Exception finishing connect", e);
              } finally {
                  if (!connected) {
@@@ -260,14 -248,12 +260,14 @@@
              try {
                  channel = serverSocketChannel.accept();
                  register(channel);
 +                final ISocketChannel serverChannel = socketChannelFactory.createServerChannel(channel);
                  channelKey = channel.register(selector, SelectionKey.OP_READ);
 -                IPCHandle handle = new IPCHandle(system, null);
 -                handle.setKey(channelKey);
 -                channelKey.attach(handle);
 -                handle.setState(HandleState.CONNECT_RECEIVED);
 +                if (serverChannel.requiresHandshake()) {
 +                    asyncHandshake(serverChannel, null, channelKey);
 +                } else {
 +                    connectionReceived(serverChannel, channelKey);
 +                }
-             } catch (IOException e) {
+             } catch (Exception e) {
                  LOGGER.error("Failed to accept channel ", e);
                  close(channelKey, channel);
              }
@@@ -282,18 -268,13 +282,18 @@@
                      register(channel);
                      if (channel.connect(handle.getRemoteAddress())) {
                          channelKey = channel.register(selector, SelectionKey.OP_READ);
 -                        connectionEstablished(handle);
 +                        final ISocketChannel clientChannel = socketChannelFactory.createClientChannel(channel);
 +                        if (clientChannel.requiresHandshake()) {
 +                            asyncHandshake(clientChannel, handle, channelKey);
 +                        } else {
 +                            connectionEstablished(handle, channelKey, clientChannel);
 +                        }
                      } else {
                          channelKey = channel.register(selector, SelectionKey.OP_CONNECT);
 +                        handle.setKey(channelKey);
 +                        channelKey.attach(handle);
                      }
-                 } catch (IOException e) {
 -                    handle.setKey(channelKey);
 -                    channelKey.attach(handle);
+                 } catch (Exception e) {
                      LOGGER.error("Failed to accept channel ", e);
                      close(channelKey, channel);
                      handle.setState(HandleState.CLOSED);