You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2018/11/06 18:41:46 UTC

[2/5] asterixdb git commit: [ASTERIXDB-2460][FUN] Fix sum() overflow bug

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
index 132fd54..3c07b47 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -18,30 +18,18 @@
  */
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.dataflow.data.nontagged.serde.*;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableFloat;
-import org.apache.asterix.om.base.AMutableInt16;
-import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableInt8;
-import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.runtime.exceptions.IncompatibleTypeException;
+import org.apache.asterix.runtime.exceptions.OverflowException;
 import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -52,187 +40,229 @@ import org.apache.hyracks.data.std.api.IPointable;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
+import java.io.DataOutput;
+import java.io.IOException;
+
 public abstract class AbstractSerializableSumAggregateFunction extends AbstractSerializableAggregateFunction {
+
+    // Handles evaluating and storing/passing serialized data
     protected static final int AGG_TYPE_OFFSET = 0;
     private static final int SUM_OFFSET = 1;
-
     private IPointable inputVal = new VoidPointable();
     private IScalarEvaluator eval;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
+
+    // Aggregate type
+    protected ATypeTag aggType;
+
+    // Result holders
     private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    private AMutableDouble aDouble = new AMutableDouble(0);
+
+    // Flags for output type (If all output flags are false, double output is used)
+    private boolean isUseInt64ForResult = true;
+
+    // Serializer/Deserializer
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer aInt64Serde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
     @SuppressWarnings("rawtypes")
-    public ISerializerDeserializer serde;
+    private ISerializerDeserializer aDoubleSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
 
+    // Constructor
     public AbstractSerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
             SourceLocation sourceLoc) throws HyracksDataException {
         super(sourceLoc);
         eval = args[0].createScalarEvaluator(context);
     }
 
+    // Abstract methods
+    protected abstract boolean skipStep(byte[] state, int start); // Skip step
+
+    protected abstract void processNull(byte[] state, int start); // Handle NULL step
+
+    protected abstract void processSystemNull() throws HyracksDataException; // Handle SYSTEM_NULL step
+
+    protected abstract void finishNull(DataOutput out) throws IOException; // Handle NULL finish
+
+    protected abstract void finishSystemNull(DataOutput out) throws IOException; // Handle SYSTEM_NULL finish
+
+    // Init the values
     @Override
     public void init(DataOutput state) throws HyracksDataException {
         try {
             state.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
-            state.writeDouble(0.0);
+            state.writeLong(0);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
     }
 
+    // Called for each incoming tuple
     @Override
     public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        // Skip current step
         if (skipStep(state, start)) {
             return;
         }
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
-        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+
+        // Evaluate/Get the data from the tuple
         eval.evaluate(tuple, inputVal);
         byte[] bytes = inputVal.getByteArray();
         int offset = inputVal.getStartOffset();
 
+        // Get the data type tag
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[offset]);
+
+        // Handle MISSING and NULL values
         if (typeTag == ATypeTag.MISSING || typeTag == ATypeTag.NULL) {
             processNull(state, start);
             return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            if (typeTag.ordinal() > aggType.ordinal()) {
-                throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, bytes[offset],
-                        aggType.serialize());
-            } else {
-                throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize(),
-                        bytes[offset]);
-            }
-        }
-
-        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-            aggType = typeTag;
         }
 
+        // Calculate based on the incoming data type + handles invalid data type
         switch (typeTag) {
             case TINYINT: {
                 byte val = AInt8SerializerDeserializer.getByte(bytes, offset + 1);
-                sum += val;
+                processInt64Value(state, start, val);
                 break;
             }
             case SMALLINT: {
                 short val = AInt16SerializerDeserializer.getShort(bytes, offset + 1);
-                sum += val;
+                processInt64Value(state, start, val);
                 break;
             }
             case INTEGER: {
                 int val = AInt32SerializerDeserializer.getInt(bytes, offset + 1);
-                sum += val;
+                processInt64Value(state, start, val);
                 break;
             }
             case BIGINT: {
                 long val = AInt64SerializerDeserializer.getLong(bytes, offset + 1);
-                sum += val;
+                processInt64Value(state, start, val);
                 break;
             }
             case FLOAT: {
+                upgradeOutputType();
                 float val = AFloatSerializerDeserializer.getFloat(bytes, offset + 1);
-                sum += val;
+                processFloatValue(state, start, val);
                 break;
             }
             case DOUBLE: {
+                upgradeOutputType();
                 double val = ADoubleSerializerDeserializer.getDouble(bytes, offset + 1);
-                sum += val;
+                processFloatValue(state, start, val);
                 break;
             }
-            case NULL: {
-                aggType = typeTag;
-                break;
-            }
-            case SYSTEM_NULL: {
+            case SYSTEM_NULL:
                 processSystemNull();
                 break;
-            }
             default:
                 throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, bytes[offset]);
         }
-        state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+    }
+
+    // Upgrade the output type
+    private void upgradeOutputType() {
+        isUseInt64ForResult = false;
+    }
+
+    // Process int64 value
+    private void processInt64Value(byte[] state, int start, long value) throws HyracksDataException {
+        // Check the output flag first
+        if (!isUseInt64ForResult) {
+            processFloatValue(state, start, value);
+        }
+
+        // Int64 output, watch out for overflow exception
+        else {
+            try {
+                // Current total
+                long sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET);
+                sum = Math.addExact(sum, value);
+
+                // Write the output
+                state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_INT64_TYPE_TAG;
+                BufferSerDeUtil.writeLong(sum, state, start + SUM_OFFSET);
+            } catch (ArithmeticException ignored) {
+                throw new OverflowException(sourceLoc, getIdentifier());
+            }
+        }
+    }
+
+    // Process float value
+    private void processFloatValue(byte[] state, int start, double value) {
+        double sum;
+        aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+        // This checks if the previous written value is bigint, SYSTEM_NULL or double and reads it accordingly
+        // Important: SYSTEM_NULL reads int64 because the written value is in64 as well, check the init() method
+        if (aggType == ATypeTag.BIGINT || aggType == ATypeTag.SYSTEM_NULL) {
+            // Last write was a bigint or SYSTEM_NULL
+            sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET);
+        } else {
+            // Last write was a double
+            sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        }
+
+        // Add the value
+        sum += value;
+
+        // Write the output
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG;
         BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
     }
 
     @SuppressWarnings("unchecked")
     @Override
+    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        // finishPartial() has identical behavior to finish()
+        finishFinal(state, start, len, out);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
     public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
-        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
-        double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+        // Finish
+        finishFinal(state, start, len, out);
+    }
+
+    @SuppressWarnings({ "unchecked", "unused" })
+    private void finishFinal(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
         try {
-            switch (aggType) {
-                case TINYINT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, out);
-                    break;
+            // aggType is SYSTEM_NULL (ran over zero values)
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.trace("SUM aggregate ran over zero values.");
                 }
-                case SMALLINT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, out);
-                    break;
-                }
-                case INTEGER: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, out);
-                    break;
-                }
-                case BIGINT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, out);
-                    break;
-                }
-                case FLOAT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, out);
-                    break;
-                }
-                case DOUBLE: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+
+                finishSystemNull(out);
+            }
+            // aggType is NULL
+            else if (aggType == ATypeTag.NULL) {
+                finishNull(out);
+            }
+            // Pass the result
+            else {
+                if (isUseInt64ForResult) {
+                    long sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET);
+                    aInt64.setValue(sum);
+                    aInt64Serde.serialize(aInt64, out);
+                } else {
+                    double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
                     aDouble.setValue(sum);
-                    serde.serialize(aDouble, out);
-                    break;
-                }
-                case NULL: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, out);
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    finishSystemNull(out);
-                    break;
+                    aDoubleSerde.serialize(aDouble, out);
                 }
-                default:
-                    throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize());
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
     }
 
-    @Override
-    public void finishPartial(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
-        finish(state, start, len, out);
-    }
-
-    protected boolean skipStep(byte[] state, int start) {
-        return false;
+    // Function identifier
+    private FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SUM;
     }
-
-    protected abstract void processNull(byte[] state, int start);
-
-    protected abstract void processSystemNull() throws HyracksDataException;
-
-    protected abstract void finishSystemNull(DataOutput out) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..3e74308
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableGlobalSqlSumAggregateDescriptor
+        extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = SerializableGlobalSqlSumAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SERIAL_GLOBAL_SQL_SUM;
+    }
+
+    @Override
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) {
+        return new ISerializedAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                return new SerializableGlobalSqlSumAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java
new file mode 100644
index 0000000..2d8fa61
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableGlobalSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+    public SerializableGlobalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    // Called for each incoming tuple
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finish(state, start, len, out);
+    }
+
+    // Is skip
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+
+    // Handle NULL step
+    @Override
+    protected void processNull(byte[] state, int start) {
+        // Do nothing
+    }
+
+    // Handle SYSTEM_NULL step
+    @Override
+    protected void processSystemNull() {
+        // Do nothing
+    }
+
+    // Handle NULL finish
+    @Override
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java
new file mode 100644
index 0000000..24bfcc9
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableGlobalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = SerializableGlobalSumAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SERIAL_GLOBAL_SUM;
+    }
+
+    @Override
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) {
+        return new ISerializedAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                return new SerializableGlobalSumAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java
new file mode 100644
index 0000000..35e2f89
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableGlobalSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+    public SerializableGlobalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    // Called for each incoming tuple
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finish(state, start, len, out);
+    }
+
+    // Is skip
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return aggType == ATypeTag.NULL;
+    }
+
+    // Handle NULL step
+    @Override
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+    }
+
+    // Handle SYSTEM_NULL step
+    @Override
+    protected void processSystemNull() {
+        // Do nothing
+    }
+
+    // Handle NULL finish
+    @Override
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..8691756
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableIntermediateSqlSumAggregateDescriptor
+        extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = SerializableIntermediateSqlSumAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SERIAL_INTERMEDIATE_SQL_SUM;
+    }
+
+    @Override
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) {
+        return new ISerializedAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                return new SerializableIntermediateSqlSumAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java
new file mode 100644
index 0000000..04eeed3
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableIntermediateSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+    public SerializableIntermediateSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    // Called for each incoming tuple
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finishPartial(state, start, len, out);
+    }
+
+    // Is skip
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+
+    // Handle NULL step
+    @Override
+    protected void processNull(byte[] state, int start) {
+        // Do nothing
+    }
+
+    // Handle SYSTEM_NULL step
+    @Override
+    protected void processSystemNull() {
+        // Do nothing
+    }
+
+    // Handle NULL finish
+    @Override
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java
new file mode 100644
index 0000000..d1d2c0f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableIntermediateSumAggregateDescriptor
+        extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+    private static final long serialVersionUID = 1L;
+    public static final IFunctionDescriptorFactory FACTORY = SerializableIntermediateSumAggregateDescriptor::new;
+
+    @Override
+    public FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SERIAL_INTERMEDIATE_SUM;
+    }
+
+    @Override
+    public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+            final IScalarEvaluatorFactory[] args) {
+        return new ISerializedAggregateEvaluatorFactory() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+                    throws HyracksDataException {
+                return new SerializableIntermediateSumAggregateFunction(args, ctx, sourceLoc);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java
new file mode 100644
index 0000000..4adc845
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableIntermediateSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+    public SerializableIntermediateSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    // Called for each incoming tuple
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finishPartial(state, start, len, out);
+    }
+
+    // Is skip
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return aggType == ATypeTag.NULL;
+    }
+
+    // Handle NULL step
+    @Override
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+    }
+
+    // Handle SYSTEM_NULL step
+    @Override
+    protected void processSystemNull() {
+        // Do nothing
+    }
+
+    // Handle NULL finish
+    @Override
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index 3798b49..79b31c4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
 import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
-        @Override
-        public IFunctionDescriptor createFunctionDescriptor() {
-            return new SerializableLocalSqlSumAggregateDescriptor();
-        }
-    };
+    public static final IFunctionDescriptorFactory FACTORY = SerializableLocalSqlSumAggregateDescriptor::new;
 
     @Override
     public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializ
             @Override
             public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new SerializableSqlSumAggregateFunction(args, true, ctx, sourceLoc);
+                return new SerializableLocalSqlSumAggregateFunction(args, ctx, sourceLoc);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java
new file mode 100644
index 0000000..9aea883
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableLocalSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+    public SerializableLocalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    // Called for each incoming tuple
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finishPartial(state, start, len, out);
+    }
+
+    // Is skip
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+
+    // Handle NULL step
+    @Override
+    protected void processNull(byte[] state, int start) {
+        // Do nothing
+    }
+
+    // Handle SYSTEM_NULL step
+    @Override
+    protected void processSystemNull() throws HyracksDataException {
+        throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM,
+                ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+    }
+
+    // Handle NULL finish
+    @Override
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index 54d24f7..512d211 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -19,25 +19,19 @@
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
 import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
-        @Override
-        public IFunctionDescriptor createFunctionDescriptor() {
-            return new SerializableLocalSumAggregateDescriptor();
-        }
-    };
+    public static final IFunctionDescriptorFactory FACTORY = SerializableLocalSumAggregateDescriptor::new;
 
     @Override
     public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableLocalSumAggregateDescriptor extends AbstractSerializabl
             @Override
             public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new SerializableSumAggregateFunction(args, true, ctx, sourceLoc);
+                return new SerializableLocalSumAggregateFunction(args, ctx, sourceLoc);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java
new file mode 100644
index 0000000..8a9e0f9
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableLocalSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+    public SerializableLocalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
+        super(args, context, sourceLoc);
+    }
+
+    // Called for each incoming tuple
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finishPartial(state, start, len, out);
+    }
+
+    // Is skip
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+        return aggType == ATypeTag.NULL;
+    }
+
+    // Handle NULL step
+    @Override
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+    }
+
+    // Handle SYSTEM_NULL step
+    @Override
+    protected void processSystemNull() throws HyracksDataException {
+        throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM,
+                ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+    }
+
+    // Handle NULL finish
+    @Override
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
+
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index d51a6fc..0dc5e17 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
 import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
-        @Override
-        public IFunctionDescriptor createFunctionDescriptor() {
-            return new SerializableSqlSumAggregateDescriptor();
-        }
-    };
+    public static final IFunctionDescriptorFactory FACTORY = SerializableSqlSumAggregateDescriptor::new;
 
     @Override
     public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableA
             @Override
             public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new SerializableSqlSumAggregateFunction(args, false, ctx, sourceLoc);
+                return new SerializableSqlSumAggregateFunction(args, ctx, sourceLoc);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
index 38033f0..3062a37 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -18,54 +18,62 @@
  */
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
 
 public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
-    private final boolean isLocalAgg;
 
-    public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
-            IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException {
+    public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
         super(args, context, sourceLoc);
-        this.isLocalAgg = isLocalAgg;
     }
 
+    // Called for each incoming tuple
+    @Override
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finish(state, start, len, out);
+    }
+
+    // Is skip
+    @Override
+    protected boolean skipStep(byte[] state, int start) {
+        return false;
+    }
+
+    // Handle NULL step
     @Override
     protected void processNull(byte[] state, int start) {
+        // Do nothing
     }
 
+    // Handle SYSTEM_NULL step
     @Override
-    protected void processSystemNull() throws HyracksDataException {
-        // For global aggregates simply ignore system null here,
-        // but if all input value are system null, then we should return
-        // null in finish().
-        if (isLocalAgg) {
-            throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SQL_SUM,
-                    ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
-        }
+    protected void processSystemNull() {
+        // Do nothing
     }
 
-    @SuppressWarnings("unchecked")
+    // Handle NULL finish
     @Override
-    protected void finishSystemNull(DataOutput out) throws IOException {
-        // Empty stream. For local agg return system null. For global agg return null.
-        if (isLocalAgg) {
-            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-        } else {
-            serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-            serde.serialize(ANull.NULL, out);
-        }
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
     }
 
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index 43eea5b..7619ca8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
 import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
 import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
 import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
 import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
 
     private static final long serialVersionUID = 1L;
-    public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
-        @Override
-        public IFunctionDescriptor createFunctionDescriptor() {
-            return new SerializableSumAggregateDescriptor();
-        }
-    };
+    public static final IFunctionDescriptorFactory FACTORY = SerializableSumAggregateDescriptor::new;
 
     @Override
     public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableSumAggregateDescriptor extends AbstractSerializableAggr
             @Override
             public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
                     throws HyracksDataException {
-                return new SerializableSumAggregateFunction(args, false, ctx, sourceLoc);
+                return new SerializableSumAggregateFunction(args, ctx, sourceLoc);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index 278914f..0be49e7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -18,62 +18,64 @@
  */
 package org.apache.asterix.runtime.aggregates.serializable.std;
 
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
 
 public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
-    private final boolean isLocalAgg;
 
-    public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
-            IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException {
+    public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+            SourceLocation sourceLoc) throws HyracksDataException {
         super(args, context, sourceLoc);
-        this.isLocalAgg = isLocalAgg;
     }
 
+    // Called for each incoming tuple
     @Override
-    protected void processNull(byte[] state, int start) {
-        state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+    public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+        super.step(tuple, state, start, len);
+    }
+
+    // Finish calculation
+    @Override
+    public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+        super.finish(state, start, len, out);
     }
 
+    // Is skip
     @Override
     protected boolean skipStep(byte[] state, int start) {
         ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
         return aggType == ATypeTag.NULL;
     }
 
+    // Handle NULL step
     @Override
-    protected void processSystemNull() throws HyracksDataException {
-        // For global aggregates simply ignore system null here,
-        // but if all input value are system null, then we should return
-        // null in finish().
-        if (isLocalAgg) {
-            throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM,
-                    ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
-        }
+    protected void processNull(byte[] state, int start) {
+        state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
     }
 
-    @SuppressWarnings("unchecked")
+    // Handle SYSTEM_NULL step
     @Override
-    protected void finishSystemNull(DataOutput out) throws IOException {
-        // Empty stream. For local agg return system null. For global agg return null.
-        if (isLocalAgg) {
-            out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
-        } else {
-            serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-            serde.serialize(ANull.NULL, out);
-        }
+    protected void processSystemNull() {
+        // Do nothing
+    }
+
+    // Handle NULL finish
+    @Override
+    protected void finishNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
     }
 
+    // Handle SYSTEM_NULL finish
+    @Override
+    protected void finishSystemNull(DataOutput out) throws IOException {
+        out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
index 037e307..836c24e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -18,29 +18,18 @@
  */
 package org.apache.asterix.runtime.aggregates.std;
 
-import java.io.IOException;
-
-import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.dataflow.data.nontagged.serde.*;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableFloat;
-import org.apache.asterix.om.base.AMutableInt16;
-import org.apache.asterix.om.base.AMutableInt32;
 import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableInt8;
-import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.functions.BuiltinFunctions;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.BuiltinType;
 import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.runtime.exceptions.IncompatibleTypeException;
+import org.apache.asterix.runtime.exceptions.OverflowException;
 import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -52,91 +41,119 @@ import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
+import java.io.IOException;
+
 public abstract class AbstractSumAggregateFunction extends AbstractAggregateFunction {
+
+    // Handles evaluating and storing/passing serialized data
     protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
     private IPointable inputVal = new VoidPointable();
     private IScalarEvaluator eval;
-    private double sum;
+
+    // Aggregate type
     protected ATypeTag aggType;
-    private AMutableDouble aDouble = new AMutableDouble(0);
-    private AMutableFloat aFloat = new AMutableFloat(0);
+
+    // Result holders
+    private long sumInt64;
+    private double sumDouble;
     private AMutableInt64 aInt64 = new AMutableInt64(0);
-    private AMutableInt32 aInt32 = new AMutableInt32(0);
-    private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
-    private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+    private AMutableDouble aDouble = new AMutableDouble(0);
+
+    // Flags for output type (If all output flags are false, double output is used)
+    private boolean isUseInt64ForResult = true;
+
+    // Serializer/Deserializer
     @SuppressWarnings("rawtypes")
-    protected ISerializerDeserializer serde;
+    private ISerializerDeserializer aInt64Serde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+    @SuppressWarnings("rawtypes")
+    private ISerializerDeserializer aDoubleSerde =
+            SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
 
+    // Constructor
     public AbstractSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
             SourceLocation sourceLoc) throws HyracksDataException {
         super(sourceLoc);
         eval = args[0].createScalarEvaluator(context);
     }
 
+    // Abstract methods
+    protected abstract boolean skipStep(); // Skip step
+
+    protected abstract void processNull(); // Handle NULL step
+
+    protected abstract void processSystemNull() throws HyracksDataException; // Handle SYSTEM_NULL step
+
+    protected abstract void finishNull(IPointable result) throws IOException; // Handle NULL finish
+
+    protected abstract void finishSystemNull(IPointable result) throws IOException; // Handle SYSTEM_NULL finish
+
+    // Init the values
     @Override
     public void init() throws HyracksDataException {
         aggType = ATypeTag.SYSTEM_NULL;
-        sum = 0.0;
+        sumInt64 = 0;
+        sumDouble = 0.0;
     }
 
+    // Called for each incoming tuple
     @Override
     public void step(IFrameTupleReference tuple) throws HyracksDataException {
+        // Skip current step
         if (skipStep()) {
             return;
         }
+
+        // Evaluate/Get the data from the tuple
         eval.evaluate(tuple, inputVal);
         byte[] data = inputVal.getByteArray();
         int offset = inputVal.getStartOffset();
 
+        // Get the data type tag
         ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]);
+
+        // Handle MISSING and NULL values
         if (typeTag == ATypeTag.MISSING || typeTag == ATypeTag.NULL) {
             processNull();
             return;
-        } else if (aggType == ATypeTag.SYSTEM_NULL) {
-            aggType = typeTag;
-        } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
-            if (typeTag.ordinal() > aggType.ordinal()) {
-                throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize(),
-                        aggType.serialize());
-            } else {
-                throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize(),
-                        typeTag.serialize());
-            }
         }
-
-        if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+        // Non-missing and Non-null
+        else if (aggType == ATypeTag.SYSTEM_NULL) {
             aggType = typeTag;
         }
 
+        // Calculate based on the incoming data type + handles invalid data type
         switch (typeTag) {
             case TINYINT: {
                 byte val = AInt8SerializerDeserializer.getByte(data, offset + 1);
-                sum += val;
+                processInt64Value(val);
                 break;
             }
             case SMALLINT: {
                 short val = AInt16SerializerDeserializer.getShort(data, offset + 1);
-                sum += val;
+                processInt64Value(val);
                 break;
             }
             case INTEGER: {
                 int val = AInt32SerializerDeserializer.getInt(data, offset + 1);
-                sum += val;
+                processInt64Value(val);
                 break;
             }
             case BIGINT: {
                 long val = AInt64SerializerDeserializer.getLong(data, offset + 1);
-                sum += val;
+                processInt64Value(val);
                 break;
             }
             case FLOAT: {
+                upgradeOutputType();
                 float val = AFloatSerializerDeserializer.getFloat(data, offset + 1);
-                sum += val;
+                processFloatValue(val);
                 break;
             }
             case DOUBLE: {
+                upgradeOutputType();
                 double val = ADoubleSerializerDeserializer.getDouble(data, offset + 1);
-                sum += val;
+                processFloatValue(val);
                 break;
             }
             case SYSTEM_NULL: {
@@ -144,83 +161,95 @@ public abstract class AbstractSumAggregateFunction extends AbstractAggregateFunc
                 break;
             }
             default: {
-                throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize());
+                throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize());
             }
         }
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void finish(IPointable result) throws HyracksDataException {
-        resultStorage.reset();
-        try {
-            switch (aggType) {
-                case TINYINT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
-                    aInt8.setValue((byte) sum);
-                    serde.serialize(aInt8, resultStorage.getDataOutput());
-                    break;
-                }
-                case SMALLINT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
-                    aInt16.setValue((short) sum);
-                    serde.serialize(aInt16, resultStorage.getDataOutput());
-                    break;
-                }
-                case INTEGER: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
-                    aInt32.setValue((int) sum);
-                    serde.serialize(aInt32, resultStorage.getDataOutput());
-                    break;
-                }
-                case BIGINT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
-                    aInt64.setValue((long) sum);
-                    serde.serialize(aInt64, resultStorage.getDataOutput());
-                    break;
-                }
-                case FLOAT: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
-                    aFloat.setValue((float) sum);
-                    serde.serialize(aFloat, resultStorage.getDataOutput());
-                    break;
-                }
-                case DOUBLE: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
-                    aDouble.setValue(sum);
-                    serde.serialize(aDouble, resultStorage.getDataOutput());
-                    break;
-                }
-                case NULL: {
-                    serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
-                    serde.serialize(ANull.NULL, resultStorage.getDataOutput());
-                    break;
-                }
-                case SYSTEM_NULL: {
-                    finishSystemNull();
-                    break;
-                }
-                default:
-                    throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize());
+    // Upgrade the output type
+    private void upgradeOutputType() {
+        isUseInt64ForResult = false;
+    }
+
+    // Process int64 value
+    private void processInt64Value(long value) throws HyracksDataException {
+        // Check the output flag first
+        if (!isUseInt64ForResult) {
+            processFloatValue(value);
+        }
+        // Int64 output, watch out for overflow exception
+        else {
+            try {
+                sumInt64 = Math.addExact(sumInt64, value);
+                sumDouble = sumInt64; // Keep the sumDouble variable up-to-date as well
+            } catch (ArithmeticException ignored) {
+                throw new OverflowException(sourceLoc, getIdentifier());
             }
-        } catch (IOException e) {
-            throw HyracksDataException.create(e);
         }
-        result.set(resultStorage);
     }
 
+    // Process float value
+    private void processFloatValue(double value) {
+        // If this method is called, it means the output is going to be a double, no need to check the output type flag
+        // and the sumInt64 can be ignored
+        sumDouble += value;
+    }
+
+    // Called for partial calculations
+    @SuppressWarnings("unchecked")
     @Override
     public void finishPartial(IPointable result) throws HyracksDataException {
-        finish(result);
+        // finishPartial() has identical behavior to finish()
+        finishFinal(result);
     }
 
-    protected boolean skipStep() {
-        return false;
+    // Called for final calculations
+    @SuppressWarnings("unchecked")
+    @Override
+    public void finish(IPointable result) throws HyracksDataException {
+        // Finish
+        finishFinal(result);
     }
 
-    protected abstract void processNull();
+    // Called for final calculations
+    @SuppressWarnings("unchecked")
+    private void finishFinal(IPointable result) throws HyracksDataException {
+        // Reset the result storage
+        resultStorage.reset();
+
+        try {
+            // aggType is SYSTEM_NULL
+            if (aggType == ATypeTag.SYSTEM_NULL) {
+                if (GlobalConfig.DEBUG) {
+                    GlobalConfig.ASTERIX_LOGGER.trace("SUM aggregate ran over zero values.");
+                }
 
-    protected abstract void processSystemNull() throws HyracksDataException;
+                finishSystemNull(result);
+            }
+            // aggType is NULL
+            else if (aggType == ATypeTag.NULL) {
+                finishNull(result);
+            }
+            // Pass the result
+            else {
+                // Output type based on the flag
+                if (isUseInt64ForResult) {
+                    aInt64.setValue(sumInt64);
+                    aInt64Serde.serialize(aInt64, resultStorage.getDataOutput());
+                    result.set(resultStorage);
+                } else {
+                    aDouble.setValue(sumDouble);
+                    aDoubleSerde.serialize(aDouble, resultStorage.getDataOutput());
+                    result.set(resultStorage);
+                }
+            }
+        } catch (IOException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
 
-    protected abstract void finishSystemNull() throws IOException;
+    // Function identifier
+    private FunctionIdentifier getIdentifier() {
+        return BuiltinFunctions.SUM;
+    }
 }