You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by dl...@apache.org on 2018/11/06 18:41:46 UTC
[2/5] asterixdb git commit: [ASTERIXDB-2460][FUN] Fix sum() overflow
bug
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
index 132fd54..3c07b47 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/AbstractSerializableSumAggregateFunction.java
@@ -18,30 +18,18 @@
*/
package org.apache.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.dataflow.data.nontagged.serde.*;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableFloat;
-import org.apache.asterix.om.base.AMutableInt16;
-import org.apache.asterix.om.base.AMutableInt32;
import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableInt8;
-import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.runtime.exceptions.IncompatibleTypeException;
+import org.apache.asterix.runtime.exceptions.OverflowException;
import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -52,187 +40,229 @@ import org.apache.hyracks.data.std.api.IPointable;
import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import java.io.DataOutput;
+import java.io.IOException;
+
public abstract class AbstractSerializableSumAggregateFunction extends AbstractSerializableAggregateFunction {
+
+ // Handles evaluating and storing/passing serialized data
protected static final int AGG_TYPE_OFFSET = 0;
private static final int SUM_OFFSET = 1;
-
private IPointable inputVal = new VoidPointable();
private IScalarEvaluator eval;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
+
+ // Aggregate type
+ protected ATypeTag aggType;
+
+ // Result holders
private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ private AMutableDouble aDouble = new AMutableDouble(0);
+
+ // Flags for output type (If all output flags are false, double output is used)
+ private boolean isUseInt64ForResult = true;
+
+ // Serializer/Deserializer
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer aInt64Serde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
@SuppressWarnings("rawtypes")
- public ISerializerDeserializer serde;
+ private ISerializerDeserializer aDoubleSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ // Constructor
public AbstractSerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
SourceLocation sourceLoc) throws HyracksDataException {
super(sourceLoc);
eval = args[0].createScalarEvaluator(context);
}
+ // Abstract methods
+ protected abstract boolean skipStep(byte[] state, int start); // Skip step
+
+ protected abstract void processNull(byte[] state, int start); // Handle NULL step
+
+ protected abstract void processSystemNull() throws HyracksDataException; // Handle SYSTEM_NULL step
+
+ protected abstract void finishNull(DataOutput out) throws IOException; // Handle NULL finish
+
+ protected abstract void finishSystemNull(DataOutput out) throws IOException; // Handle SYSTEM_NULL finish
+
+ // Init the values
@Override
public void init(DataOutput state) throws HyracksDataException {
try {
state.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
- state.writeDouble(0.0);
+ state.writeLong(0);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
+ // Called for each incoming tuple
@Override
public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ // Skip current step
if (skipStep(state, start)) {
return;
}
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
- double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+
+ // Evaluate/Get the data from the tuple
eval.evaluate(tuple, inputVal);
byte[] bytes = inputVal.getByteArray();
int offset = inputVal.getStartOffset();
+ // Get the data type tag
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(bytes[offset]);
+
+ // Handle MISSING and NULL values
if (typeTag == ATypeTag.MISSING || typeTag == ATypeTag.NULL) {
processNull(state, start);
return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
- if (typeTag.ordinal() > aggType.ordinal()) {
- throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, bytes[offset],
- aggType.serialize());
- } else {
- throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize(),
- bytes[offset]);
- }
- }
-
- if (ATypeHierarchy.canPromote(aggType, typeTag)) {
- aggType = typeTag;
}
+ // Calculate based on the incoming data type + handles invalid data type
switch (typeTag) {
case TINYINT: {
byte val = AInt8SerializerDeserializer.getByte(bytes, offset + 1);
- sum += val;
+ processInt64Value(state, start, val);
break;
}
case SMALLINT: {
short val = AInt16SerializerDeserializer.getShort(bytes, offset + 1);
- sum += val;
+ processInt64Value(state, start, val);
break;
}
case INTEGER: {
int val = AInt32SerializerDeserializer.getInt(bytes, offset + 1);
- sum += val;
+ processInt64Value(state, start, val);
break;
}
case BIGINT: {
long val = AInt64SerializerDeserializer.getLong(bytes, offset + 1);
- sum += val;
+ processInt64Value(state, start, val);
break;
}
case FLOAT: {
+ upgradeOutputType();
float val = AFloatSerializerDeserializer.getFloat(bytes, offset + 1);
- sum += val;
+ processFloatValue(state, start, val);
break;
}
case DOUBLE: {
+ upgradeOutputType();
double val = ADoubleSerializerDeserializer.getDouble(bytes, offset + 1);
- sum += val;
+ processFloatValue(state, start, val);
break;
}
- case NULL: {
- aggType = typeTag;
- break;
- }
- case SYSTEM_NULL: {
+ case SYSTEM_NULL:
processSystemNull();
break;
- }
default:
throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, bytes[offset]);
}
- state[start + AGG_TYPE_OFFSET] = aggType.serialize();
+ }
+
+ // Upgrade the output type
+ private void upgradeOutputType() {
+ isUseInt64ForResult = false;
+ }
+
+ // Process int64 value
+ private void processInt64Value(byte[] state, int start, long value) throws HyracksDataException {
+ // Check the output flag first
+ if (!isUseInt64ForResult) {
+ processFloatValue(state, start, value);
+ }
+
+ // Int64 output, watch out for overflow exception
+ else {
+ try {
+ // Current total
+ long sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET);
+ sum = Math.addExact(sum, value);
+
+ // Write the output
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_INT64_TYPE_TAG;
+ BufferSerDeUtil.writeLong(sum, state, start + SUM_OFFSET);
+ } catch (ArithmeticException ignored) {
+ throw new OverflowException(sourceLoc, getIdentifier());
+ }
+ }
+ }
+
+ // Process float value
+ private void processFloatValue(byte[] state, int start, double value) {
+ double sum;
+ aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
+ // This checks if the previous written value is bigint, SYSTEM_NULL or double and reads it accordingly
+ // Important: SYSTEM_NULL reads int64 because the written value is in64 as well, check the init() method
+ if (aggType == ATypeTag.BIGINT || aggType == ATypeTag.SYSTEM_NULL) {
+ // Last write was a bigint or SYSTEM_NULL
+ sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET);
+ } else {
+ // Last write was a double
+ sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ }
+
+ // Add the value
+ sum += value;
+
+ // Write the output
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG;
BufferSerDeUtil.writeDouble(sum, state, start + SUM_OFFSET);
}
@SuppressWarnings("unchecked")
@Override
+ public void finishPartial(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ // finishPartial() has identical behavior to finish()
+ finishFinal(state, start, len, out);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
- ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
- double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
+ // Finish
+ finishFinal(state, start, len, out);
+ }
+
+ @SuppressWarnings({ "unchecked", "unused" })
+ private void finishFinal(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+
try {
- switch (aggType) {
- case TINYINT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
- break;
+ // aggType is SYSTEM_NULL (ran over zero values)
+ if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.trace("SUM aggregate ran over zero values.");
}
- case SMALLINT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
- break;
- }
- case INTEGER: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
- break;
- }
- case BIGINT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
- break;
- }
- case FLOAT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
- break;
- }
- case DOUBLE: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+
+ finishSystemNull(out);
+ }
+ // aggType is NULL
+ else if (aggType == ATypeTag.NULL) {
+ finishNull(out);
+ }
+ // Pass the result
+ else {
+ if (isUseInt64ForResult) {
+ long sum = BufferSerDeUtil.getLong(state, start + SUM_OFFSET);
+ aInt64.setValue(sum);
+ aInt64Serde.serialize(aInt64, out);
+ } else {
+ double sum = BufferSerDeUtil.getDouble(state, start + SUM_OFFSET);
aDouble.setValue(sum);
- serde.serialize(aDouble, out);
- break;
- }
- case NULL: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- break;
- }
- case SYSTEM_NULL: {
- finishSystemNull(out);
- break;
+ aDoubleSerde.serialize(aDouble, out);
}
- default:
- throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize());
}
} catch (IOException e) {
throw HyracksDataException.create(e);
}
}
- @Override
- public void finishPartial(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
- finish(state, start, len, out);
- }
-
- protected boolean skipStep(byte[] state, int start) {
- return false;
+ // Function identifier
+ private FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SUM;
}
-
- protected abstract void processNull(byte[] state, int start);
-
- protected abstract void processSystemNull() throws HyracksDataException;
-
- protected abstract void finishSystemNull(DataOutput out) throws IOException;
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..3e74308
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableGlobalSqlSumAggregateDescriptor
+ extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = SerializableGlobalSqlSumAggregateDescriptor::new;
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SERIAL_GLOBAL_SQL_SUM;
+ }
+
+ @Override
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) {
+ return new ISerializedAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ return new SerializableGlobalSqlSumAggregateFunction(args, ctx, sourceLoc);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java
new file mode 100644
index 0000000..2d8fa61
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlSumAggregateFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableGlobalSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+ public SerializableGlobalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ }
+
+ // Called for each incoming tuple
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finish(state, start, len, out);
+ }
+
+ // Is skip
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
+ }
+
+ // Handle NULL step
+ @Override
+ protected void processNull(byte[] state, int start) {
+ // Do nothing
+ }
+
+ // Handle SYSTEM_NULL step
+ @Override
+ protected void processSystemNull() {
+ // Do nothing
+ }
+
+ // Handle NULL finish
+ @Override
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java
new file mode 100644
index 0000000..24bfcc9
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateDescriptor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableGlobalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = SerializableGlobalSumAggregateDescriptor::new;
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SERIAL_GLOBAL_SUM;
+ }
+
+ @Override
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) {
+ return new ISerializedAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ return new SerializableGlobalSumAggregateFunction(args, ctx, sourceLoc);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java
new file mode 100644
index 0000000..35e2f89
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSumAggregateFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableGlobalSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+ public SerializableGlobalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ }
+
+ // Called for each incoming tuple
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finish(state, start, len, out);
+ }
+
+ // Is skip
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return aggType == ATypeTag.NULL;
+ }
+
+ // Handle NULL step
+ @Override
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+ }
+
+ // Handle SYSTEM_NULL step
+ @Override
+ protected void processSystemNull() {
+ // Do nothing
+ }
+
+ // Handle NULL finish
+ @Override
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java
new file mode 100644
index 0000000..8691756
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableIntermediateSqlSumAggregateDescriptor
+ extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = SerializableIntermediateSqlSumAggregateDescriptor::new;
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SERIAL_INTERMEDIATE_SQL_SUM;
+ }
+
+ @Override
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) {
+ return new ISerializedAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ return new SerializableIntermediateSqlSumAggregateFunction(args, ctx, sourceLoc);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java
new file mode 100644
index 0000000..04eeed3
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlSumAggregateFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableIntermediateSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+ public SerializableIntermediateSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ }
+
+ // Called for each incoming tuple
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finishPartial(state, start, len, out);
+ }
+
+ // Is skip
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
+ }
+
+ // Handle NULL step
+ @Override
+ protected void processNull(byte[] state, int start) {
+ // Do nothing
+ }
+
+ // Handle SYSTEM_NULL step
+ @Override
+ protected void processSystemNull() {
+ // Do nothing
+ }
+
+ // Handle NULL finish
+ @Override
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java
new file mode 100644
index 0000000..d1d2c0f
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateDescriptor.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
+import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class SerializableIntermediateSumAggregateDescriptor
+ extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ public static final IFunctionDescriptorFactory FACTORY = SerializableIntermediateSumAggregateDescriptor::new;
+
+ @Override
+ public FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SERIAL_INTERMEDIATE_SUM;
+ }
+
+ @Override
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) {
+ return new ISerializedAggregateEvaluatorFactory() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws HyracksDataException {
+ return new SerializableIntermediateSumAggregateFunction(args, ctx, sourceLoc);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java
new file mode 100644
index 0000000..4adc845
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSumAggregateFunction.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableIntermediateSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+ public SerializableIntermediateSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ }
+
+ // Called for each incoming tuple
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finishPartial(state, start, len, out);
+ }
+
+ // Is skip
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return aggType == ATypeTag.NULL;
+ }
+
+ // Handle NULL step
+ @Override
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+ }
+
+ // Handle SYSTEM_NULL step
+ @Override
+ protected void processSystemNull() {
+ // Do nothing
+ }
+
+ // Handle NULL finish
+ @Override
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index 3798b49..79b31c4 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -19,7 +19,6 @@
package org.apache.asterix.runtime.aggregates.serializable.std;
import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
- @Override
- public IFunctionDescriptor createFunctionDescriptor() {
- return new SerializableLocalSqlSumAggregateDescriptor();
- }
- };
+ public static final IFunctionDescriptorFactory FACTORY = SerializableLocalSqlSumAggregateDescriptor::new;
@Override
public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializ
@Override
public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
throws HyracksDataException {
- return new SerializableSqlSumAggregateFunction(args, true, ctx, sourceLoc);
+ return new SerializableLocalSqlSumAggregateFunction(args, ctx, sourceLoc);
}
};
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java
new file mode 100644
index 0000000..9aea883
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableLocalSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+ public SerializableLocalSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ }
+
+ // Called for each incoming tuple
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finishPartial(state, start, len, out);
+ }
+
+ // Is skip
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
+ }
+
+ // Handle NULL step
+ @Override
+ protected void processNull(byte[] state, int start) {
+ // Do nothing
+ }
+
+ // Handle SYSTEM_NULL step
+ @Override
+ protected void processSystemNull() throws HyracksDataException {
+ throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM,
+ ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ }
+
+ // Handle NULL finish
+ @Override
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index 54d24f7..512d211 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -19,25 +19,19 @@
package org.apache.asterix.runtime.aggregates.serializable.std;
import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
- @Override
- public IFunctionDescriptor createFunctionDescriptor() {
- return new SerializableLocalSumAggregateDescriptor();
- }
- };
+ public static final IFunctionDescriptorFactory FACTORY = SerializableLocalSumAggregateDescriptor::new;
@Override
public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableLocalSumAggregateDescriptor extends AbstractSerializabl
@Override
public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
throws HyracksDataException {
- return new SerializableSumAggregateFunction(args, true, ctx, sourceLoc);
+ return new SerializableLocalSumAggregateFunction(args, ctx, sourceLoc);
}
};
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java
new file mode 100644
index 0000000..8a9e0f9
--- /dev/null
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.aggregates.serializable.std;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class SerializableLocalSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
+
+ public SerializableLocalSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
+ super(args, context, sourceLoc);
+ }
+
+ // Called for each incoming tuple
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finishPartial(state, start, len, out);
+ }
+
+ // Is skip
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
+ return aggType == ATypeTag.NULL;
+ }
+
+ // Handle NULL step
+ @Override
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+ }
+
+ // Handle SYSTEM_NULL step
+ @Override
+ protected void processSystemNull() throws HyracksDataException {
+ throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM,
+ ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ }
+
+ // Handle NULL finish
+ @Override
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
+
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index d51a6fc..0dc5e17 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -19,7 +19,6 @@
package org.apache.asterix.runtime.aggregates.serializable.std;
import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
- @Override
- public IFunctionDescriptor createFunctionDescriptor() {
- return new SerializableSqlSumAggregateDescriptor();
- }
- };
+ public static final IFunctionDescriptorFactory FACTORY = SerializableSqlSumAggregateDescriptor::new;
@Override
public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableA
@Override
public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
throws HyracksDataException {
- return new SerializableSqlSumAggregateFunction(args, false, ctx, sourceLoc);
+ return new SerializableSqlSumAggregateFunction(args, ctx, sourceLoc);
}
};
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
index 38033f0..3062a37 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -18,54 +18,62 @@
*/
package org.apache.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
-import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
- private final boolean isLocalAgg;
- public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
- IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException {
+ public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
super(args, context, sourceLoc);
- this.isLocalAgg = isLocalAgg;
}
+ // Called for each incoming tuple
+ @Override
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finish(state, start, len, out);
+ }
+
+ // Is skip
+ @Override
+ protected boolean skipStep(byte[] state, int start) {
+ return false;
+ }
+
+ // Handle NULL step
@Override
protected void processNull(byte[] state, int start) {
+ // Do nothing
}
+ // Handle SYSTEM_NULL step
@Override
- protected void processSystemNull() throws HyracksDataException {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SQL_SUM,
- ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
- }
+ protected void processSystemNull() {
+ // Do nothing
}
- @SuppressWarnings("unchecked")
+ // Handle NULL finish
@Override
- protected void finishSystemNull(DataOutput out) throws IOException {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
}
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index 43eea5b..7619ca8 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -19,7 +19,6 @@
package org.apache.asterix.runtime.aggregates.serializable.std;
import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.functions.IFunctionDescriptor;
import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
@@ -32,12 +31,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
- public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
- @Override
- public IFunctionDescriptor createFunctionDescriptor() {
- return new SerializableSumAggregateDescriptor();
- }
- };
+ public static final IFunctionDescriptorFactory FACTORY = SerializableSumAggregateDescriptor::new;
@Override
public FunctionIdentifier getIdentifier() {
@@ -53,7 +47,7 @@ public class SerializableSumAggregateDescriptor extends AbstractSerializableAggr
@Override
public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
throws HyracksDataException {
- return new SerializableSumAggregateFunction(args, false, ctx, sourceLoc);
+ return new SerializableSumAggregateFunction(args, ctx, sourceLoc);
}
};
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index 278914f..0be49e7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -18,62 +18,64 @@
*/
package org.apache.asterix.runtime.aggregates.serializable.std;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
-import org.apache.asterix.om.base.ANull;
-import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+
+import java.io.DataOutput;
+import java.io.IOException;
public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
- private final boolean isLocalAgg;
- public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
- IHyracksTaskContext context, SourceLocation sourceLoc) throws HyracksDataException {
+ public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
+ SourceLocation sourceLoc) throws HyracksDataException {
super(args, context, sourceLoc);
- this.isLocalAgg = isLocalAgg;
}
+ // Called for each incoming tuple
@Override
- protected void processNull(byte[] state, int start) {
- state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
+ public void step(IFrameTupleReference tuple, byte[] state, int start, int len) throws HyracksDataException {
+ super.step(tuple, state, start, len);
+ }
+
+ // Finish calculation
+ @Override
+ public void finish(byte[] state, int start, int len, DataOutput out) throws HyracksDataException {
+ super.finish(state, start, len, out);
}
+ // Is skip
@Override
protected boolean skipStep(byte[] state, int start) {
ATypeTag aggType = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(state[start + AGG_TYPE_OFFSET]);
return aggType == ATypeTag.NULL;
}
+ // Handle NULL step
@Override
- protected void processSystemNull() throws HyracksDataException {
- // For global aggregates simply ignore system null here,
- // but if all input value are system null, then we should return
- // null in finish().
- if (isLocalAgg) {
- throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM,
- ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
- }
+ protected void processNull(byte[] state, int start) {
+ state[start + AGG_TYPE_OFFSET] = ATypeTag.SERIALIZED_NULL_TYPE_TAG;
}
- @SuppressWarnings("unchecked")
+ // Handle SYSTEM_NULL step
@Override
- protected void finishSystemNull(DataOutput out) throws IOException {
- // Empty stream. For local agg return system null. For global agg return null.
- if (isLocalAgg) {
- out.writeByte(ATypeTag.SYSTEM_NULL.serialize());
- } else {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
- }
+ protected void processSystemNull() {
+ // Do nothing
+ }
+
+ // Handle NULL finish
+ @Override
+ protected void finishNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
}
+ // Handle SYSTEM_NULL finish
+ @Override
+ protected void finishSystemNull(DataOutput out) throws IOException {
+ out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5af85d9e/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
index 037e307..836c24e 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -18,29 +18,18 @@
*/
package org.apache.asterix.runtime.aggregates.std;
-import java.io.IOException;
-
-import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
-import org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.dataflow.data.nontagged.serde.*;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.om.base.AMutableDouble;
-import org.apache.asterix.om.base.AMutableFloat;
-import org.apache.asterix.om.base.AMutableInt16;
-import org.apache.asterix.om.base.AMutableInt32;
import org.apache.asterix.om.base.AMutableInt64;
-import org.apache.asterix.om.base.AMutableInt8;
-import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.functions.BuiltinFunctions;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
-import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
-import org.apache.asterix.runtime.exceptions.IncompatibleTypeException;
+import org.apache.asterix.runtime.exceptions.OverflowException;
import org.apache.asterix.runtime.exceptions.UnsupportedItemTypeException;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -52,91 +41,119 @@ import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import java.io.IOException;
+
public abstract class AbstractSumAggregateFunction extends AbstractAggregateFunction {
+
+ // Handles evaluating and storing/passing serialized data
protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
private IPointable inputVal = new VoidPointable();
private IScalarEvaluator eval;
- private double sum;
+
+ // Aggregate type
protected ATypeTag aggType;
- private AMutableDouble aDouble = new AMutableDouble(0);
- private AMutableFloat aFloat = new AMutableFloat(0);
+
+ // Result holders
+ private long sumInt64;
+ private double sumDouble;
private AMutableInt64 aInt64 = new AMutableInt64(0);
- private AMutableInt32 aInt32 = new AMutableInt32(0);
- private AMutableInt16 aInt16 = new AMutableInt16((short) 0);
- private AMutableInt8 aInt8 = new AMutableInt8((byte) 0);
+ private AMutableDouble aDouble = new AMutableDouble(0);
+
+ // Flags for output type (If all output flags are false, double output is used)
+ private boolean isUseInt64ForResult = true;
+
+ // Serializer/Deserializer
@SuppressWarnings("rawtypes")
- protected ISerializerDeserializer serde;
+ private ISerializerDeserializer aInt64Serde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
+ @SuppressWarnings("rawtypes")
+ private ISerializerDeserializer aDoubleSerde =
+ SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
+ // Constructor
public AbstractSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context,
SourceLocation sourceLoc) throws HyracksDataException {
super(sourceLoc);
eval = args[0].createScalarEvaluator(context);
}
+ // Abstract methods
+ protected abstract boolean skipStep(); // Skip step
+
+ protected abstract void processNull(); // Handle NULL step
+
+ protected abstract void processSystemNull() throws HyracksDataException; // Handle SYSTEM_NULL step
+
+ protected abstract void finishNull(IPointable result) throws IOException; // Handle NULL finish
+
+ protected abstract void finishSystemNull(IPointable result) throws IOException; // Handle SYSTEM_NULL finish
+
+ // Init the values
@Override
public void init() throws HyracksDataException {
aggType = ATypeTag.SYSTEM_NULL;
- sum = 0.0;
+ sumInt64 = 0;
+ sumDouble = 0.0;
}
+ // Called for each incoming tuple
@Override
public void step(IFrameTupleReference tuple) throws HyracksDataException {
+ // Skip current step
if (skipStep()) {
return;
}
+
+ // Evaluate/Get the data from the tuple
eval.evaluate(tuple, inputVal);
byte[] data = inputVal.getByteArray();
int offset = inputVal.getStartOffset();
+ // Get the data type tag
ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]);
+
+ // Handle MISSING and NULL values
if (typeTag == ATypeTag.MISSING || typeTag == ATypeTag.NULL) {
processNull();
return;
- } else if (aggType == ATypeTag.SYSTEM_NULL) {
- aggType = typeTag;
- } else if (typeTag != ATypeTag.SYSTEM_NULL && !ATypeHierarchy.isCompatible(typeTag, aggType)) {
- if (typeTag.ordinal() > aggType.ordinal()) {
- throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize(),
- aggType.serialize());
- } else {
- throw new IncompatibleTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize(),
- typeTag.serialize());
- }
}
-
- if (ATypeHierarchy.canPromote(aggType, typeTag)) {
+ // Non-missing and Non-null
+ else if (aggType == ATypeTag.SYSTEM_NULL) {
aggType = typeTag;
}
+ // Calculate based on the incoming data type + handles invalid data type
switch (typeTag) {
case TINYINT: {
byte val = AInt8SerializerDeserializer.getByte(data, offset + 1);
- sum += val;
+ processInt64Value(val);
break;
}
case SMALLINT: {
short val = AInt16SerializerDeserializer.getShort(data, offset + 1);
- sum += val;
+ processInt64Value(val);
break;
}
case INTEGER: {
int val = AInt32SerializerDeserializer.getInt(data, offset + 1);
- sum += val;
+ processInt64Value(val);
break;
}
case BIGINT: {
long val = AInt64SerializerDeserializer.getLong(data, offset + 1);
- sum += val;
+ processInt64Value(val);
break;
}
case FLOAT: {
+ upgradeOutputType();
float val = AFloatSerializerDeserializer.getFloat(data, offset + 1);
- sum += val;
+ processFloatValue(val);
break;
}
case DOUBLE: {
+ upgradeOutputType();
double val = ADoubleSerializerDeserializer.getDouble(data, offset + 1);
- sum += val;
+ processFloatValue(val);
break;
}
case SYSTEM_NULL: {
@@ -144,83 +161,95 @@ public abstract class AbstractSumAggregateFunction extends AbstractAggregateFunc
break;
}
default: {
- throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize());
+ throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, typeTag.serialize());
}
}
}
- @SuppressWarnings("unchecked")
- @Override
- public void finish(IPointable result) throws HyracksDataException {
- resultStorage.reset();
- try {
- switch (aggType) {
- case TINYINT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
- aInt8.setValue((byte) sum);
- serde.serialize(aInt8, resultStorage.getDataOutput());
- break;
- }
- case SMALLINT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
- aInt16.setValue((short) sum);
- serde.serialize(aInt16, resultStorage.getDataOutput());
- break;
- }
- case INTEGER: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
- aInt32.setValue((int) sum);
- serde.serialize(aInt32, resultStorage.getDataOutput());
- break;
- }
- case BIGINT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
- aInt64.setValue((long) sum);
- serde.serialize(aInt64, resultStorage.getDataOutput());
- break;
- }
- case FLOAT: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
- aFloat.setValue((float) sum);
- serde.serialize(aFloat, resultStorage.getDataOutput());
- break;
- }
- case DOUBLE: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
- aDouble.setValue(sum);
- serde.serialize(aDouble, resultStorage.getDataOutput());
- break;
- }
- case NULL: {
- serde = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, resultStorage.getDataOutput());
- break;
- }
- case SYSTEM_NULL: {
- finishSystemNull();
- break;
- }
- default:
- throw new UnsupportedItemTypeException(sourceLoc, BuiltinFunctions.SUM, aggType.serialize());
+ // Upgrade the output type
+ private void upgradeOutputType() {
+ isUseInt64ForResult = false;
+ }
+
+ // Process int64 value
+ private void processInt64Value(long value) throws HyracksDataException {
+ // Check the output flag first
+ if (!isUseInt64ForResult) {
+ processFloatValue(value);
+ }
+ // Int64 output, watch out for overflow exception
+ else {
+ try {
+ sumInt64 = Math.addExact(sumInt64, value);
+ sumDouble = sumInt64; // Keep the sumDouble variable up-to-date as well
+ } catch (ArithmeticException ignored) {
+ throw new OverflowException(sourceLoc, getIdentifier());
}
- } catch (IOException e) {
- throw HyracksDataException.create(e);
}
- result.set(resultStorage);
}
+ // Process float value
+ private void processFloatValue(double value) {
+ // If this method is called, it means the output is going to be a double, no need to check the output type flag
+ // and the sumInt64 can be ignored
+ sumDouble += value;
+ }
+
+ // Called for partial calculations
+ @SuppressWarnings("unchecked")
@Override
public void finishPartial(IPointable result) throws HyracksDataException {
- finish(result);
+ // finishPartial() has identical behavior to finish()
+ finishFinal(result);
}
- protected boolean skipStep() {
- return false;
+ // Called for final calculations
+ @SuppressWarnings("unchecked")
+ @Override
+ public void finish(IPointable result) throws HyracksDataException {
+ // Finish
+ finishFinal(result);
}
- protected abstract void processNull();
+ // Called for final calculations
+ @SuppressWarnings("unchecked")
+ private void finishFinal(IPointable result) throws HyracksDataException {
+ // Reset the result storage
+ resultStorage.reset();
+
+ try {
+ // aggType is SYSTEM_NULL
+ if (aggType == ATypeTag.SYSTEM_NULL) {
+ if (GlobalConfig.DEBUG) {
+ GlobalConfig.ASTERIX_LOGGER.trace("SUM aggregate ran over zero values.");
+ }
- protected abstract void processSystemNull() throws HyracksDataException;
+ finishSystemNull(result);
+ }
+ // aggType is NULL
+ else if (aggType == ATypeTag.NULL) {
+ finishNull(result);
+ }
+ // Pass the result
+ else {
+ // Output type based on the flag
+ if (isUseInt64ForResult) {
+ aInt64.setValue(sumInt64);
+ aInt64Serde.serialize(aInt64, resultStorage.getDataOutput());
+ result.set(resultStorage);
+ } else {
+ aDouble.setValue(sumDouble);
+ aDoubleSerde.serialize(aDouble, resultStorage.getDataOutput());
+ result.set(resultStorage);
+ }
+ }
+ } catch (IOException ex) {
+ throw HyracksDataException.create(ex);
+ }
+ }
- protected abstract void finishSystemNull() throws IOException;
+ // Function identifier
+ private FunctionIdentifier getIdentifier() {
+ return BuiltinFunctions.SUM;
+ }
}