You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/02/13 03:15:37 UTC

[08/24] incubator-asterixdb git commit: Move to non-copy-based evaluator interfaces for all function implementations, including: - scalar functions, - aggregate functions, - running aggregate functions, - unnesting functions

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SpatialIntersectDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SpatialIntersectDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SpatialIntersectDescriptor.java
index bec1f79..93eeee9 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SpatialIntersectDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SpatialIntersectDescriptor.java
@@ -43,11 +43,13 @@ import org.apache.asterix.runtime.evaluators.common.SpatialUtils;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
@@ -63,19 +65,21 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
     };
 
     @Override
-    public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
-        return new ICopyEvaluatorFactory() {
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+            throws AlgebricksException {
+        return new IScalarEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
-                return new ICopyEvaluator() {
-
-                    private final DataOutput out = output.getDataOutput();
-                    private final ArrayBackedValueStorage outInput0 = new ArrayBackedValueStorage();
-                    private final ArrayBackedValueStorage outInput1 = new ArrayBackedValueStorage();
-                    private final ICopyEvaluator eval0 = args[0].createEvaluator(outInput0);
-                    private final ICopyEvaluator eval1 = args[1].createEvaluator(outInput1);
+            public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+                return new IScalarEvaluator() {
+
+                    private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+                    private final DataOutput out = resultStorage.getDataOutput();
+                    private final IPointable inputArg0 = new VoidPointable();
+                    private final IPointable inputArg1 = new VoidPointable();
+                    private final IScalarEvaluator eval0 = args[0].createScalarEvaluator(ctx);
+                    private final IScalarEvaluator eval1 = args[1].createScalarEvaluator(ctx);
                     private final IBinaryComparator ascDoubleComp = AqlBinaryComparatorFactoryProvider.DOUBLE_POINTABLE_INSTANCE
                             .createBinaryComparator();
                     private final SpatialUtils spatialUtils = new SpatialUtils();
@@ -108,14 +112,15 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return true;
                     }
 
-                    private boolean pointInPolygon(byte[] bytes0, byte[] bytes1) throws HyracksDataException { // ray casting
+                    private boolean pointInPolygon(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
+                            throws HyracksDataException { // ray casting
 
                         double pX = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
+                                offset0 + APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
                         double pY = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
+                                offset0 + APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
                         int numOfPoints1 = AInt16SerializerDeserializer.getShort(bytes1,
-                                APolygonSerializerDeserializer.getNumberOfPointsOffset());
+                                offset1 + APolygonSerializerDeserializer.getNumberOfPointsOffset());
 
                         if (numOfPoints1 < 3) {
                             throw new HyracksDataException(AsterixBuiltinFunctions.SPATIAL_INTERSECT.getName()
@@ -126,21 +131,21 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         double xInters;
                         double x1, x2, y1, y2;
                         x1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.X));
+                                offset1 + APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.X));
                         y1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.Y));
+                                offset1 + APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.Y));
 
                         for (int i = 1; i <= numOfPoints1; i++) {
                             if (i == numOfPoints1) {
                                 x2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.X));
+                                        offset1 + APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.X));
                                 y2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.Y));
+                                        offset1 + APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.Y));
                             } else {
                                 x2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.X));
+                                        offset1 + APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.X));
                                 y2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.Y));
+                                        offset1 + APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.Y));
                             }
 
                             if (!pointOnLine(pX, pY, x1, y1, x2, y2)) {
@@ -167,18 +172,19 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         }
                     }
 
-                    private boolean pointInCircle(byte[] bytes0, byte[] bytes1) throws HyracksDataException {
+                    private boolean pointInCircle(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
+                            throws HyracksDataException {
                         double x = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
+                                offset0 + APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
                         double y = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
+                                offset0 + APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
 
                         double cX = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
+                                offset1 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
                         double cY = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
+                                offset1 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
                         double radius = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getRadiusOffset());
+                                offset1 + ACircleSerializerDeserializer.getRadiusOffset());
 
                         if ((x - cX) * (x - cX) + (y - cY) * (y - cY) <= (radius * radius)) {
                             return true;
@@ -212,18 +218,19 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return false;
                     }
 
-                    private boolean linePolygonIntersection(byte[] bytes0, byte[] bytes1) throws HyracksDataException {
+                    private boolean linePolygonIntersection(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
+                            throws HyracksDataException {
                         double startX1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.X));
+                                offset0 + ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.X));
                         double startY1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.Y));
                         double endX1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.X));
+                                offset0 + ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.X));
                         double endY1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.Y));
 
                         int numOfPoints1 = AInt16SerializerDeserializer.getShort(bytes1,
-                                APolygonSerializerDeserializer.getNumberOfPointsOffset());
+                                offset1 + APolygonSerializerDeserializer.getNumberOfPointsOffset());
 
                         if (numOfPoints1 < 3) {
                             throw new HyracksDataException(AsterixBuiltinFunctions.SPATIAL_INTERSECT.getName()
@@ -231,22 +238,22 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         }
                         for (int i = 0; i < numOfPoints1; i++) {
                             double startX2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                    APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.X));
+                                    offset1 + APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.X));
                             double startY2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                    APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.Y));
+                                    offset1 + APolygonSerializerDeserializer.getCoordinateOffset(i, Coordinate.Y));
 
                             double endX2;
                             double endY2;
                             if (i + 1 == numOfPoints1) {
                                 endX2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.X));
+                                        offset1 + APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.X));
                                 endY2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.Y));
+                                        offset1 + APolygonSerializerDeserializer.getCoordinateOffset(0, Coordinate.Y));
                             } else {
-                                endX2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(i + 1, Coordinate.X));
-                                endY2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                        APolygonSerializerDeserializer.getCoordinateOffset(i + 1, Coordinate.Y));
+                                endX2 = ADoubleSerializerDeserializer.getDouble(bytes1, offset1
+                                        + APolygonSerializerDeserializer.getCoordinateOffset(i + 1, Coordinate.X));
+                                endY2 = ADoubleSerializerDeserializer.getDouble(bytes1, offset1
+                                        + APolygonSerializerDeserializer.getCoordinateOffset(i + 1, Coordinate.Y));
                             }
 
                             boolean intersect = lineLineIntersection(startX1, startY1, endX1, endY1, startX2, startY2,
@@ -258,26 +265,26 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return false;
                     }
 
-                    private boolean lineRectangleIntersection(byte[] bytes0, byte[] bytes1)
+                    private boolean lineRectangleIntersection(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
                             throws HyracksDataException {
                         double startX1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.X));
+                                offset0 + ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.X));
                         double startY1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.Y));
                         double endX1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.X));
+                                offset0 + ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.X));
                         double endY1 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.Y));
 
                         double x1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.X));
+                                offset1 + ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.X));
                         double y1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.Y));
+                                offset1 + ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.Y));
 
                         double x2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.X));
+                                offset1 + ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.X));
                         double y2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.Y));
+                                offset1 + ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.Y));
 
                         if (lineLineIntersection(startX1, startY1, endX1, endY1, x1, y1, x1, y2)
                                 || lineLineIntersection(startX1, startY1, endX1, endY1, x1, y2, x2, y2)
@@ -289,22 +296,23 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
 
                     }
 
-                    private boolean lineCircleIntersection(byte[] bytes0, byte[] bytes1) throws HyracksDataException {
+                    private boolean lineCircleIntersection(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
+                            throws HyracksDataException {
                         double startX = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.X));
+                                offset0 + ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.X));
                         double startY = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ALineSerializerDeserializer.getStartPointCoordinateOffset(Coordinate.Y));
                         double endX = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.X));
+                                offset0 + ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.X));
                         double endY = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ALineSerializerDeserializer.getEndPointCoordinateOffset(Coordinate.Y));
 
                         double cX = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
+                                offset1 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
                         double cY = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
+                                offset1 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
                         double radius = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getRadiusOffset());
+                                offset1 + ACircleSerializerDeserializer.getRadiusOffset());
 
                         double dx = endX - startX;
                         double dy = endY - startY;
@@ -325,25 +333,25 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return false;
                     }
 
-                    private boolean findEar(byte[] bytes, int u, int v, int w, int n, IntArray pointsOffsets)
-                            throws HyracksDataException {
+                    private boolean findEar(byte[] bytes, int offset, int u, int v, int w, int n,
+                            IntArray pointsOffsets) throws HyracksDataException {
                         int p;
                         double Ax, Ay, Bx, By, Cx, Cy, Px, Py;
 
-                        Ax = ADoubleSerializerDeserializer.getDouble(bytes,
-                                APolygonSerializerDeserializer.getCoordinateOffset(pointsOffsets.get(u), Coordinate.X));
-                        Ay = ADoubleSerializerDeserializer.getDouble(bytes,
-                                APolygonSerializerDeserializer.getCoordinateOffset(pointsOffsets.get(u), Coordinate.Y));
+                        Ax = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
+                                .getCoordinateOffset(pointsOffsets.get(u), Coordinate.X));
+                        Ay = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
+                                .getCoordinateOffset(pointsOffsets.get(u), Coordinate.Y));
 
-                        Bx = ADoubleSerializerDeserializer.getDouble(bytes,
-                                APolygonSerializerDeserializer.getCoordinateOffset(pointsOffsets.get(v), Coordinate.X));
-                        By = ADoubleSerializerDeserializer.getDouble(bytes,
-                                APolygonSerializerDeserializer.getCoordinateOffset(pointsOffsets.get(v), Coordinate.Y));
+                        Bx = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
+                                .getCoordinateOffset(pointsOffsets.get(v), Coordinate.X));
+                        By = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
+                                .getCoordinateOffset(pointsOffsets.get(v), Coordinate.Y));
 
-                        Cx = ADoubleSerializerDeserializer.getDouble(bytes,
-                                APolygonSerializerDeserializer.getCoordinateOffset(pointsOffsets.get(w), Coordinate.X));
-                        Cy = ADoubleSerializerDeserializer.getDouble(bytes,
-                                APolygonSerializerDeserializer.getCoordinateOffset(pointsOffsets.get(w), Coordinate.Y));
+                        Cx = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
+                                .getCoordinateOffset(pointsOffsets.get(w), Coordinate.X));
+                        Cy = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
+                                .getCoordinateOffset(pointsOffsets.get(w), Coordinate.Y));
 
                         if (SpatialUtils.doubleEpsilon() > (((Bx - Ax) * (Cy - Ay)) - ((By - Ay) * (Cx - Ax)))) {
 
@@ -354,9 +362,9 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                             if ((p == u) || (p == v) || (p == w)) {
                                 continue;
                             }
-                            Px = ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
+                            Px = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
                                     .getCoordinateOffset(pointsOffsets.get(p), Coordinate.X));
-                            Py = ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
+                            Py = ADoubleSerializerDeserializer.getDouble(bytes, offset + APolygonSerializerDeserializer
                                     .getCoordinateOffset(pointsOffsets.get(p), Coordinate.Y));
                             if (pointInsideTriangle(Ax, Ay, Bx, By, Cx, Cy, Px, Py)) {
                                 return false;
@@ -366,7 +374,7 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return true;
                     }
 
-                    private int triangulatePolygon(byte[] bytes, int numOfPoints, IntArray pointsOffsets,
+                    private int triangulatePolygon(byte[] bytes, int offset, int numOfPoints, IntArray pointsOffsets,
                             DoubleArray trianglesX, DoubleArray trianglesY, int triangleId,
                             int nonSimplePolygonDetection, int middleVertex) throws HyracksDataException { // Ear clipping
 
@@ -394,34 +402,40 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                 w = 0;
                             }
 
-                            if (findEar(bytes, u, v, w, numOfPoints, pointsOffsets)) {
+                            if (findEar(bytes, offset, u, v, w, numOfPoints, pointsOffsets)) {
                                 int s, t;
 
                                 addRectangle(trianglesX, trianglesY);
 
                                 SpatialUtils.setTriangleXCoordinate(trianglesX, triangleId, 0,
-                                        ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
-                                                .getCoordinateOffset(pointsOffsets.get(u), Coordinate.X)));
+                                        ADoubleSerializerDeserializer.getDouble(bytes,
+                                                offset + APolygonSerializerDeserializer
+                                                        .getCoordinateOffset(pointsOffsets.get(u), Coordinate.X)));
 
                                 SpatialUtils.setTriangleYCoordinate(trianglesY, triangleId, 0,
-                                        ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
-                                                .getCoordinateOffset(pointsOffsets.get(u), Coordinate.Y)));
+                                        ADoubleSerializerDeserializer.getDouble(bytes,
+                                                offset + APolygonSerializerDeserializer
+                                                        .getCoordinateOffset(pointsOffsets.get(u), Coordinate.Y)));
 
                                 SpatialUtils.setTriangleXCoordinate(trianglesX, triangleId, 1,
-                                        ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
-                                                .getCoordinateOffset(pointsOffsets.get(v), Coordinate.X)));
+                                        ADoubleSerializerDeserializer.getDouble(bytes,
+                                                offset + APolygonSerializerDeserializer
+                                                        .getCoordinateOffset(pointsOffsets.get(v), Coordinate.X)));
 
                                 SpatialUtils.setTriangleYCoordinate(trianglesY, triangleId, 1,
-                                        ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
-                                                .getCoordinateOffset(pointsOffsets.get(v), Coordinate.Y)));
+                                        ADoubleSerializerDeserializer.getDouble(bytes,
+                                                offset + APolygonSerializerDeserializer
+                                                        .getCoordinateOffset(pointsOffsets.get(v), Coordinate.Y)));
 
                                 SpatialUtils.setTriangleXCoordinate(trianglesX, triangleId, 2,
-                                        ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
-                                                .getCoordinateOffset(pointsOffsets.get(w), Coordinate.X)));
+                                        ADoubleSerializerDeserializer.getDouble(bytes,
+                                                offset + APolygonSerializerDeserializer
+                                                        .getCoordinateOffset(pointsOffsets.get(w), Coordinate.X)));
 
                                 SpatialUtils.setTriangleYCoordinate(trianglesY, triangleId, 2,
-                                        ADoubleSerializerDeserializer.getDouble(bytes, APolygonSerializerDeserializer
-                                                .getCoordinateOffset(pointsOffsets.get(w), Coordinate.Y)));
+                                        ADoubleSerializerDeserializer.getDouble(bytes,
+                                                offset + APolygonSerializerDeserializer
+                                                        .getCoordinateOffset(pointsOffsets.get(w), Coordinate.Y)));
 
                                 // remove v from polygon
                                 for (s = v, t = v + 1; t < numOfPoints; s++, t++) {
@@ -470,15 +484,15 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return (cp1 * cp2) >= 0.0;
                     }
 
-                    private boolean circleTriangleIntersection(byte[] bytes0, DoubleArray trianglesX,
+                    private boolean circleTriangleIntersection(byte[] bytes0, int offset0, DoubleArray trianglesX,
                             DoubleArray trianglesY, int triangleId) throws HyracksDataException { // separating axis theorem
 
                         double cX = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
+                                offset0 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
                         double cY = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
                         double radius = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ACircleSerializerDeserializer.getRadiusOffset());
+                                offset0 + ACircleSerializerDeserializer.getRadiusOffset());
 
                         double distance = Double.MAX_VALUE;
                         double distanceSquared;
@@ -537,20 +551,21 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return true;
                     }
 
-                    private boolean circleCircleIntersection(byte[] bytes0, byte[] bytes1) throws HyracksDataException {
+                    private boolean circleCircleIntersection(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
+                            throws HyracksDataException {
                         double cX0 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
+                                offset0 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
                         double cY0 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
+                                offset0 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
                         double radius0 = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                ACircleSerializerDeserializer.getRadiusOffset());
+                                offset0 + ACircleSerializerDeserializer.getRadiusOffset());
 
                         double cX1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
+                                offset1 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.X));
                         double cY1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
+                                offset1 + ACircleSerializerDeserializer.getCenterPointCoordinateOffset(Coordinate.Y));
                         double radius1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ACircleSerializerDeserializer.getRadiusOffset());
+                                offset1 + ACircleSerializerDeserializer.getRadiusOffset());
 
                         double distanceSquared = SpatialUtils.dotProduct(cX0 - cX1, cY0 - cY1, cX0 - cX1, cY0 - cY1);
                         double radiusDistanceSquared = (radius0 + radius1) * (radius0 + radius1);
@@ -560,10 +575,10 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return false;
                     }
 
-                    private void getCounterClockWisePolygon(byte[] bytes, IntArray pointsOffsets, int numOfPoints)
-                            throws HyracksDataException {
+                    private void getCounterClockWisePolygon(byte[] bytes, int offset, IntArray pointsOffsets,
+                            int numOfPoints) throws HyracksDataException {
                         pointsOffsets.reset();
-                        if (SpatialUtils.polygonArea(bytes, numOfPoints) > 0.0) {
+                        if (SpatialUtils.polygonArea(bytes, offset, numOfPoints) > 0.0) {
                             for (int i = 0; i < numOfPoints; i++) {
                                 pointsOffsets.add(i);
                             }
@@ -574,22 +589,23 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         }
                     }
 
-                    private boolean pointInRectangle(byte[] bytes0, byte[] bytes1) throws HyracksDataException {
+                    private boolean pointInRectangle(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
+                            throws HyracksDataException {
 
                         double pX = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
+                                offset0 + APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
                         double pY = ADoubleSerializerDeserializer.getDouble(bytes0,
-                                APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
+                                offset0 + APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
 
                         double x1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.X));
+                                offset1 + ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.X));
                         double y1 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.Y));
+                                offset1 + ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.Y));
 
                         double x2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.X));
+                                offset1 + ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.X));
                         double y2 = ADoubleSerializerDeserializer.getDouble(bytes1,
-                                ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.Y));
+                                offset1 + ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.Y));
 
                         if (pointInsideTriangle(x1, y1, x1, y2, x2, y2, pX, pY)
                                 || pointInsideTriangle(x1, y1, x2, y1, x2, y2, pX, pY)) {
@@ -607,13 +623,13 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         }
                     }
 
-                    private boolean rectangleCircleIntersection(byte[] bytes0, byte[] bytes1)
+                    private boolean rectangleCircleIntersection(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
                             throws HyracksDataException {
-                        triangulateRectangle(bytes0, trianglesX0, trianglesY0);
+                        triangulateRectangle(bytes0, offset0, trianglesX0, trianglesY0);
                         boolean res = false;
                         // 2 triangles in a rectangle
                         for (int i = 0; i < 2; i++) {
-                            res = circleTriangleIntersection(bytes1, trianglesX0, trianglesY0, i);
+                            res = circleTriangleIntersection(bytes1, offset1, trianglesX0, trianglesY0, i);
                             if (res) {
                                 break;
                             }
@@ -621,17 +637,17 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return res;
                     }
 
-                    private void triangulateRectangle(byte[] bytes, DoubleArray trianglesX, DoubleArray trianglesY)
-                            throws HyracksDataException {
+                    private void triangulateRectangle(byte[] bytes, int offset, DoubleArray trianglesX,
+                            DoubleArray trianglesY) throws HyracksDataException {
                         double x1 = ADoubleSerializerDeserializer.getDouble(bytes,
-                                ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.X));
+                                offset + ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.X));
                         double y1 = ADoubleSerializerDeserializer.getDouble(bytes,
-                                ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.Y));
+                                offset + ARectangleSerializerDeserializer.getBottomLeftCoordinateOffset(Coordinate.Y));
 
                         double x2 = ADoubleSerializerDeserializer.getDouble(bytes,
-                                ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.X));
+                                offset + ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.X));
                         double y2 = ADoubleSerializerDeserializer.getDouble(bytes,
-                                ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.Y));
+                                offset + ARectangleSerializerDeserializer.getUpperRightCoordinateOffset(Coordinate.Y));
                         trianglesX.reset();
                         trianglesY.reset();
 
@@ -657,17 +673,17 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         SpatialUtils.setTriangleYCoordinate(trianglesY, 1, 2, y1);
                     }
 
-                    private boolean rectanglePolygonIntersection(byte[] bytes0, byte[] bytes1)
+                    private boolean rectanglePolygonIntersection(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
                             throws HyracksDataException, AlgebricksException {
                         int numOfPoints1 = AInt16SerializerDeserializer.getShort(bytes1,
-                                APolygonSerializerDeserializer.getNumberOfPointsOffset());
+                                offset1 + APolygonSerializerDeserializer.getNumberOfPointsOffset());
 
                         if (numOfPoints1 < 3) {
                             throw new HyracksDataException(AsterixBuiltinFunctions.SPATIAL_INTERSECT.getName()
                                     + ": polygon must have at least 3 points.");
                         }
 
-                        getCounterClockWisePolygon(bytes1, pointsOffsets1, numOfPoints1);
+                        getCounterClockWisePolygon(bytes1, offset1, pointsOffsets1, numOfPoints1);
                         int nonSimplePolygonDetection1 = 2 * numOfPoints1;
                         int middleVertex1 = numOfPoints1 - 1;
                         int numOfTriangles1 = 0;
@@ -675,8 +691,9 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         trianglesX1.reset();
                         trianglesY1.reset();
                         while (true) {
-                            middleVertex1 = triangulatePolygon(bytes1, numOfPoints1, pointsOffsets1, trianglesX1,
-                                    trianglesY1, numOfTriangles1, nonSimplePolygonDetection1, middleVertex1);
+                            middleVertex1 = triangulatePolygon(bytes1, offset1, numOfPoints1, pointsOffsets1,
+                                    trianglesX1, trianglesY1, numOfTriangles1, nonSimplePolygonDetection1,
+                                    middleVertex1);
 
                             if (middleVertex1 == -1) {
                                 break;
@@ -687,7 +704,7 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                             numOfTriangles1++;
                         }
 
-                        triangulateRectangle(bytes0, trianglesX0, trianglesY0);
+                        triangulateRectangle(bytes0, offset0, trianglesX0, trianglesY0);
                         boolean res = false;
                         // 2 triangles in a rectangle
                         for (int j = 0; j < 2; j++) {
@@ -709,17 +726,17 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         return false;
                     }
 
-                    private boolean polygonCircleIntersection(byte[] bytes0, byte[] bytes1)
+                    private boolean polygonCircleIntersection(byte[] bytes0, int offset0, byte[] bytes1, int offset1)
                             throws HyracksDataException, AlgebricksException {
                         int numOfPoints = AInt16SerializerDeserializer.getShort(bytes0,
-                                APolygonSerializerDeserializer.getNumberOfPointsOffset());
+                                offset0 + APolygonSerializerDeserializer.getNumberOfPointsOffset());
 
                         if (numOfPoints < 3) {
                             throw new HyracksDataException(AsterixBuiltinFunctions.SPATIAL_INTERSECT.getName()
                                     + ": polygon must have at least 3 points.");
                         }
 
-                        getCounterClockWisePolygon(bytes0, pointsOffsets0, numOfPoints);
+                        getCounterClockWisePolygon(bytes0, offset0, pointsOffsets0, numOfPoints);
                         int nonSimplePolygonDetection = 2 * numOfPoints;
                         int middleVertex = numOfPoints - 1;
                         int numOfTriangles = 0;
@@ -728,7 +745,7 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         trianglesY0.reset();
                         boolean res = false;
                         while (true) {
-                            middleVertex = triangulatePolygon(bytes0, numOfPoints, pointsOffsets0, trianglesX0,
+                            middleVertex = triangulatePolygon(bytes0, offset0, numOfPoints, pointsOffsets0, trianglesX0,
                                     trianglesY0, numOfTriangles, nonSimplePolygonDetection, middleVertex);
 
                             if (middleVertex == -1) {
@@ -739,7 +756,7 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                             numOfTriangles++;
                             int lastTriangle = (trianglesX0.length() / 3) - 1;
 
-                            res = circleTriangleIntersection(bytes1, trianglesX0, trianglesY0, lastTriangle);
+                            res = circleTriangleIntersection(bytes1, offset1, trianglesX0, trianglesY0, lastTriangle);
                             if (res) {
                                 return true;
                             }
@@ -748,73 +765,70 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                     }
 
                     @Override
-                    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
-                        outInput0.reset();
-                        eval0.evaluate(tuple);
-                        outInput1.reset();
-                        eval1.evaluate(tuple);
+                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+                        resultStorage.reset();
+                        eval0.evaluate(tuple, inputArg0);
+                        eval1.evaluate(tuple, inputArg1);
+
+                        byte[] bytes0 = inputArg0.getByteArray();
+                        byte[] bytes1 = inputArg1.getByteArray();
+                        int offset0 = inputArg0.getStartOffset();
+                        int offset1 = inputArg1.getStartOffset();
 
                         try {
                             boolean res = false;
-                            ATypeTag tag0 = EnumDeserializer.ATYPETAGDESERIALIZER
-                                    .deserialize(outInput0.getByteArray()[0]);
-                            ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER
-                                    .deserialize(outInput1.getByteArray()[0]);
+                            ATypeTag tag0 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes0[offset0]);
+                            ATypeTag tag1 = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes1[offset1]);
 
                             switch (tag0) {
                                 case POINT:
                                     switch (tag1) {
                                         case POINT:
-                                            if (ascDoubleComp
-                                                    .compare(outInput0.getByteArray(),
-                                                            APointSerializerDeserializer.getCoordinateOffset(
-                                                                    Coordinate.X),
-                                                            8, outInput1.getByteArray(), APointSerializerDeserializer
-                                                                    .getCoordinateOffset(Coordinate.X),
-                                                            8) == 0) {
-                                                if (ascDoubleComp
-                                                        .compare(outInput0.getByteArray(),
-                                                                APointSerializerDeserializer.getCoordinateOffset(
-                                                                        Coordinate.Y),
-                                                                8, outInput1.getByteArray(),
-                                                                APointSerializerDeserializer
-                                                                        .getCoordinateOffset(Coordinate.Y),
-                                                                8) == 0) {
+                                            if (ascDoubleComp.compare(bytes0,
+                                                    offset0 + APointSerializerDeserializer
+                                                            .getCoordinateOffset(Coordinate.X),
+                                                    8, bytes1, offset1 + APointSerializerDeserializer
+                                                            .getCoordinateOffset(Coordinate.X),
+                                                    8) == 0) {
+                                                if (ascDoubleComp.compare(bytes0,
+                                                        offset0 + APointSerializerDeserializer
+                                                                .getCoordinateOffset(Coordinate.Y),
+                                                        8, bytes1, offset1 + APointSerializerDeserializer
+                                                                .getCoordinateOffset(Coordinate.Y),
+                                                        8) == 0) {
                                                     res = true;
                                                 }
                                             }
                                             break;
                                         case LINE:
-                                            double pX = ADoubleSerializerDeserializer.getDouble(
-                                                    outInput0.getByteArray(),
-                                                    APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
-                                            double pY = ADoubleSerializerDeserializer.getDouble(
-                                                    outInput0.getByteArray(),
-                                                    APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
-
-                                            double startX = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double pX = ADoubleSerializerDeserializer.getDouble(bytes0, offset0
+                                                    + APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
+                                            double pY = ADoubleSerializerDeserializer.getDouble(bytes0, offset0
+                                                    + APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
+
+                                            double startX = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.X));
-                                            double startY = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double startY = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.Y));
-                                            double endX = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double endX = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.X));
-                                            double endY = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double endY = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.Y));
 
                                             res = pointOnLine(pX, pY, startX, startY, endX, endY);
                                             break;
                                         case POLYGON:
-                                            res = pointInPolygon(outInput0.getByteArray(), outInput1.getByteArray());
+                                            res = pointInPolygon(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case CIRCLE:
-                                            res = pointInCircle(outInput0.getByteArray(), outInput1.getByteArray());
+                                            res = pointInCircle(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case RECTANGLE:
-                                            res = pointInRectangle(outInput0.getByteArray(), outInput1.getByteArray());
+                                            res = pointInRectangle(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case NULL:
                                             res = false;
@@ -828,68 +842,63 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                 case LINE:
                                     switch (tag1) {
                                         case POINT:
-                                            double pX = ADoubleSerializerDeserializer.getDouble(
-                                                    outInput1.getByteArray(),
-                                                    APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
-                                            double pY = ADoubleSerializerDeserializer.getDouble(
-                                                    outInput1.getByteArray(),
-                                                    APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
-
-                                            double startX = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double pX = ADoubleSerializerDeserializer.getDouble(bytes1, offset1
+                                                    + APointSerializerDeserializer.getCoordinateOffset(Coordinate.X));
+                                            double pY = ADoubleSerializerDeserializer.getDouble(bytes1, offset1
+                                                    + APointSerializerDeserializer.getCoordinateOffset(Coordinate.Y));
+
+                                            double startX = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.X));
-                                            double startY = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double startY = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.Y));
-                                            double endX = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double endX = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.X));
-                                            double endY = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double endY = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.Y));
 
                                             res = pointOnLine(pX, pY, startX, startY, endX, endY);
                                             break;
                                         case LINE:
-                                            double startX1 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double startX1 = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.X));
-                                            double startY1 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double startY1 = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.Y));
-                                            double endX1 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double endX1 = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.X));
-                                            double endY1 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput0.getByteArray(), ALineSerializerDeserializer
+                                            double endY1 = ADoubleSerializerDeserializer.getDouble(bytes0,
+                                                    offset0 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.Y));
 
-                                            double startX2 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double startX2 = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.X));
-                                            double startY2 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double startY2 = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getStartPointCoordinateOffset(Coordinate.Y));
-                                            double endX2 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double endX2 = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.X));
-                                            double endY2 = ADoubleSerializerDeserializer
-                                                    .getDouble(outInput1.getByteArray(), ALineSerializerDeserializer
+                                            double endY2 = ADoubleSerializerDeserializer.getDouble(bytes1,
+                                                    offset1 + ALineSerializerDeserializer
                                                             .getEndPointCoordinateOffset(Coordinate.Y));
                                             res = lineLineIntersection(startX1, startY1, endX1, endY1, startX2, startY2,
                                                     endX2, endY2);
                                             break;
                                         case POLYGON:
-                                            res = linePolygonIntersection(outInput0.getByteArray(),
-                                                    outInput1.getByteArray());
+                                            res = linePolygonIntersection(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case CIRCLE:
-                                            res = lineCircleIntersection(outInput0.getByteArray(),
-                                                    outInput1.getByteArray());
+                                            res = lineCircleIntersection(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case RECTANGLE:
-                                            res = lineRectangleIntersection(outInput0.getByteArray(),
-                                                    outInput1.getByteArray());
+                                            res = lineRectangleIntersection(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case NULL:
                                             res = false;
@@ -903,28 +912,23 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                 case POLYGON:
                                     switch (tag1) {
                                         case POINT:
-                                            res = pointInPolygon(outInput1.getByteArray(), outInput0.getByteArray());
+                                            res = pointInPolygon(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case LINE:
-                                            res = linePolygonIntersection(outInput1.getByteArray(),
-                                                    outInput0.getByteArray());
+                                            res = linePolygonIntersection(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case POLYGON:
-                                            int numOfPoints0 = AInt16SerializerDeserializer.getShort(
-                                                    outInput0.getByteArray(),
-                                                    APolygonSerializerDeserializer.getNumberOfPointsOffset());
-                                            int numOfPoints1 = AInt16SerializerDeserializer.getShort(
-                                                    outInput1.getByteArray(),
-                                                    APolygonSerializerDeserializer.getNumberOfPointsOffset());
+                                            int numOfPoints0 = AInt16SerializerDeserializer.getShort(bytes0,
+                                                    offset0 + APolygonSerializerDeserializer.getNumberOfPointsOffset());
+                                            int numOfPoints1 = AInt16SerializerDeserializer.getShort(bytes1,
+                                                    offset1 + APolygonSerializerDeserializer.getNumberOfPointsOffset());
 
                                             if (numOfPoints0 < 3 || numOfPoints1 < 3) {
                                                 throw new AlgebricksException("Polygon must have at least 3 points.");
                                             }
 
-                                            getCounterClockWisePolygon(outInput0.getByteArray(), pointsOffsets0,
-                                                    numOfPoints0);
-                                            getCounterClockWisePolygon(outInput1.getByteArray(), pointsOffsets1,
-                                                    numOfPoints1);
+                                            getCounterClockWisePolygon(bytes0, offset0, pointsOffsets0, numOfPoints0);
+                                            getCounterClockWisePolygon(bytes1, offset1, pointsOffsets1, numOfPoints1);
                                             int nonSimplePolygonDetection0 = 2 * numOfPoints0;
                                             int nonSimplePolygonDetection1 = 2 * numOfPoints1;
                                             boolean intersect = false;
@@ -935,9 +939,9 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                             trianglesX1.reset();
                                             trianglesY1.reset();
                                             while (true) {
-                                                middleVertex1 = triangulatePolygon(outInput1.getByteArray(),
-                                                        numOfPoints1, pointsOffsets1, trianglesX1, trianglesY1,
-                                                        numOfTriangles1, nonSimplePolygonDetection1, middleVertex1);
+                                                middleVertex1 = triangulatePolygon(bytes1, offset1, numOfPoints1,
+                                                        pointsOffsets1, trianglesX1, trianglesY1, numOfTriangles1,
+                                                        nonSimplePolygonDetection1, middleVertex1);
 
                                                 if (middleVertex1 == -1) {
                                                     break;
@@ -951,9 +955,9 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                             trianglesX0.reset();
                                             trianglesY0.reset();
                                             while (true) {
-                                                middleVertex0 = triangulatePolygon(outInput0.getByteArray(),
-                                                        numOfPoints0, pointsOffsets0, trianglesX0, trianglesY0,
-                                                        numOfTriangles0, nonSimplePolygonDetection0, middleVertex0);
+                                                middleVertex0 = triangulatePolygon(bytes0, offset0, numOfPoints0,
+                                                        pointsOffsets0, trianglesX0, trianglesY0, numOfTriangles0,
+                                                        nonSimplePolygonDetection0, middleVertex0);
 
                                                 if (middleVertex0 == -1) {
                                                     break;
@@ -984,12 +988,10 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                             }
                                             break;
                                         case CIRCLE:
-                                            res = polygonCircleIntersection(outInput0.getByteArray(),
-                                                    outInput1.getByteArray());
+                                            res = polygonCircleIntersection(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case RECTANGLE:
-                                            res = rectanglePolygonIntersection(outInput1.getByteArray(),
-                                                    outInput0.getByteArray());
+                                            res = rectanglePolygonIntersection(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case NULL:
                                             res = false;
@@ -1003,23 +1005,19 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                 case CIRCLE:
                                     switch (tag1) {
                                         case POINT:
-                                            res = pointInCircle(outInput1.getByteArray(), outInput0.getByteArray());
+                                            res = pointInCircle(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case LINE:
-                                            res = lineCircleIntersection(outInput1.getByteArray(),
-                                                    outInput0.getByteArray());
+                                            res = lineCircleIntersection(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case POLYGON:
-                                            res = polygonCircleIntersection(outInput1.getByteArray(),
-                                                    outInput0.getByteArray());
+                                            res = polygonCircleIntersection(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case CIRCLE:
-                                            res = circleCircleIntersection(outInput0.getByteArray(),
-                                                    outInput1.getByteArray());
+                                            res = circleCircleIntersection(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case RECTANGLE:
-                                            res = rectangleCircleIntersection(outInput1.getByteArray(),
-                                                    outInput0.getByteArray());
+                                            res = rectangleCircleIntersection(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case NULL:
                                             res = false;
@@ -1033,23 +1031,20 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                                 case RECTANGLE:
                                     switch (tag1) {
                                         case POINT:
-                                            res = pointInRectangle(outInput1.getByteArray(), outInput0.getByteArray());
+                                            res = pointInRectangle(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case LINE:
-                                            res = lineRectangleIntersection(outInput1.getByteArray(),
-                                                    outInput0.getByteArray());
+                                            res = lineRectangleIntersection(bytes1, offset1, bytes0, offset0);
                                             break;
                                         case POLYGON:
-                                            res = rectanglePolygonIntersection(outInput0.getByteArray(),
-                                                    outInput1.getByteArray());
+                                            res = rectanglePolygonIntersection(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case CIRCLE:
-                                            res = rectangleCircleIntersection(outInput0.getByteArray(),
-                                                    outInput1.getByteArray());
+                                            res = rectangleCircleIntersection(bytes0, offset0, bytes1, offset1);
                                             break;
                                         case RECTANGLE:
-                                            triangulateRectangle(outInput0.getByteArray(), trianglesX0, trianglesY0);
-                                            triangulateRectangle(outInput1.getByteArray(), trianglesX1, trianglesY1);
+                                            triangulateRectangle(bytes0, offset0, trianglesX0, trianglesY0);
+                                            triangulateRectangle(bytes1, offset1, trianglesX1, trianglesY1);
 
                                             boolean intersect = false;
                                             // 2 triangles in a rectangle
@@ -1097,6 +1092,7 @@ public class SpatialIntersectDescriptor extends AbstractScalarFunctionDynamicDes
                         } catch (HyracksDataException hde) {
                             throw new AlgebricksException(hde);
                         }
+                        result.set(resultStorage);
                     }
                 };
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
index 52734b2..07a79c4 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringConcatDescriptor.java
@@ -34,10 +34,12 @@ import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicD
 import org.apache.asterix.runtime.evaluators.common.AsterixListAccessor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.util.string.UTF8StringUtil;
@@ -53,39 +55,42 @@ public class StringConcatDescriptor extends AbstractScalarFunctionDynamicDescrip
     };
 
     @Override
-    public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) {
-        return new ICopyEvaluatorFactory() {
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) {
+        return new IScalarEvaluatorFactory() {
 
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopyEvaluator createEvaluator(final IDataOutputProvider output) throws AlgebricksException {
-                return new ICopyEvaluator() {
+            public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws AlgebricksException {
+                return new IScalarEvaluator() {
 
+                    private final ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
                     private final AsterixListAccessor listAccessor = new AsterixListAccessor();
-                    private final DataOutput out = output.getDataOutput();
-                    private final ICopyEvaluatorFactory listEvalFactory = args[0];
-                    private final ArrayBackedValueStorage outInputList = new ArrayBackedValueStorage();
-                    private final ICopyEvaluator evalList = listEvalFactory.createEvaluator(outInputList);
+                    private final DataOutput out = resultStorage.getDataOutput();
+                    private final IScalarEvaluatorFactory listEvalFactory = args[0];
+                    private final IPointable inputArgList = new VoidPointable();
+                    private final IScalarEvaluator evalList = listEvalFactory.createScalarEvaluator(ctx);
                     @SuppressWarnings("unchecked")
                     private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
                             .getSerializerDeserializer(BuiltinType.ANULL);
                     private final byte[] tempLengthArray = new byte[5];
 
                     @Override
-                    public void evaluate(IFrameTupleReference tuple) throws AlgebricksException {
+                    public void evaluate(IFrameTupleReference tuple, IPointable result) throws AlgebricksException {
+                        resultStorage.reset();
                         try {
-                            outInputList.reset();
-                            evalList.evaluate(tuple);
-                            byte[] listBytes = outInputList.getByteArray();
-                            if (listBytes[0] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
-                                    && listBytes[0] != ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
+                            evalList.evaluate(tuple, inputArgList);
+                            byte[] listBytes = inputArgList.getByteArray();
+                            int listOffset = inputArgList.getStartOffset();
+
+                            if (listBytes[listOffset] != ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG
+                                    && listBytes[listOffset] != ATypeTag.SERIALIZED_UNORDEREDLIST_TYPE_TAG) {
                                 throw new AlgebricksException(AsterixBuiltinFunctions.STRING_CONCAT.getName()
                                         + ": expects input type ORDEREDLIST/UNORDEREDLIST, but got "
-                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(listBytes[0]));
+                                        + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(listBytes[listOffset]));
                             }
                             try {
-                                listAccessor.reset(listBytes, 0);
+                                listAccessor.reset(listBytes, listOffset);
                             } catch (AsterixException e) {
                                 throw new AlgebricksException(e);
                             }
@@ -103,6 +108,7 @@ public class StringConcatDescriptor extends AbstractScalarFunctionDynamicDescrip
                                     if (itemType != ATypeTag.STRING) {
                                         if (itemType == ATypeTag.NULL) {
                                             nullSerde.serialize(ANull.NULL, out);
+                                            result.set(resultStorage);
                                             return;
                                         }
                                         throw new AlgebricksException(AsterixBuiltinFunctions.STRING_CONCAT.getName()
@@ -129,6 +135,7 @@ public class StringConcatDescriptor extends AbstractScalarFunctionDynamicDescrip
                         } catch (IOException e1) {
                             throw new AlgebricksException(e1.getMessage());
                         }
+                        result.set(resultStorage);
                     }
                 };
             }

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringContainsDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringContainsDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringContainsDescriptor.java
index 2825630..941a84c 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringContainsDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringContainsDescriptor.java
@@ -18,40 +18,38 @@
  */
 package org.apache.asterix.runtime.evaluators.functions;
 
-import java.io.DataOutput;
-
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 
 public class StringContainsDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
 
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new StringContainsDescriptor();
         }
     };
 
     @Override
-    public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+            throws AlgebricksException {
 
-        return new ICopyEvaluatorFactory() {
+        return new IScalarEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
-
-                DataOutput dout = output.getDataOutput();
+            public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
 
-                return new AbstractBinaryStringBoolEval(dout, args[0], args[1],
+                return new AbstractBinaryStringBoolEval(ctx, args[0], args[1],
                         AsterixBuiltinFunctions.STRING_CONTAINS) {
                     @Override
                     protected boolean compute(UTF8StringPointable left, UTF8StringPointable right)

http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEndsWithDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEndsWithDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEndsWithDescriptor.java
index 673ba0f..0c251d4 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEndsWithDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/StringEndsWithDescriptor.java
@@ -18,40 +18,38 @@
  */
 package org.apache.asterix.runtime.evaluators.functions;
 
-import java.io.DataOutput;
-
 import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
 import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 
 public class StringEndsWithDescriptor extends AbstractScalarFunctionDynamicDescriptor {
     private static final long serialVersionUID = 1L;
 
     public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+        @Override
         public IFunctionDescriptor createFunctionDescriptor() {
             return new StringEndsWithDescriptor();
         }
     };
 
     @Override
-    public ICopyEvaluatorFactory createEvaluatorFactory(final ICopyEvaluatorFactory[] args) throws AlgebricksException {
+    public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args)
+            throws AlgebricksException {
 
-        return new ICopyEvaluatorFactory() {
+        return new IScalarEvaluatorFactory() {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public ICopyEvaluator createEvaluator(IDataOutputProvider output) throws AlgebricksException {
-
-                DataOutput dout = output.getDataOutput();
+            public IScalarEvaluator createScalarEvaluator(IHyracksTaskContext ctx) throws AlgebricksException {
 
-                return new AbstractBinaryStringBoolEval(dout, args[0], args[1],
+                return new AbstractBinaryStringBoolEval(ctx, args[0], args[1],
                         AsterixBuiltinFunctions.STRING_ENDS_WITH) {
 
                     @Override