You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by bu...@apache.org on 2016/02/13 03:15:51 UTC
[22/24] incubator-asterixdb git commit: Move to non-copy-based
evaluator interfaces for all function implementations,
including: - scalar functions, - aggregate functions,
- running aggregate functions, - unnesting functions
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
index 88585fb..b7507b1 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateDescriptor.java
@@ -25,14 +25,17 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
-public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+public class SerializableGlobalSqlAvgAggregateDescriptor
+ extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableGlobalSqlAvgAggregateDescriptor();
}
@@ -44,14 +47,15 @@ public class SerializableGlobalSqlAvgAggregateDescriptor extends AbstractSeriali
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableGlobalSqlAvgAggregateFunction(args);
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableGlobalSqlAvgAggregateFunction(args, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
index d2cc167..ebd8e89 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableGlobalSqlAvgAggregateFunction.java
@@ -22,13 +22,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
import java.io.DataOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableGlobalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableGlobalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
- super(args);
+ public SerializableGlobalSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+ throws AlgebricksException {
+ super(args, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
index 1956c7d..e9920e2 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateDescriptor.java
@@ -25,15 +25,17 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
-public class SerializableIntermediateAvgAggregateDescriptor extends
- AbstractSerializableAggregateFunctionDynamicDescriptor {
+public class SerializableIntermediateAvgAggregateDescriptor
+ extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableIntermediateAvgAggregateDescriptor();
}
@@ -45,13 +47,15 @@ public class SerializableIntermediateAvgAggregateDescriptor extends
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableIntermediateAvgAggregateFunction(args);
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableIntermediateAvgAggregateFunction(args, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
index af21c9f..3ea3d76 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateAvgAggregateFunction.java
@@ -24,13 +24,15 @@ import java.io.DataOutput;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableIntermediateAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableIntermediateAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
- super(args);
+ public SerializableIntermediateAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+ throws AlgebricksException {
+ super(args, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
index 91f6f4e..aa4d699 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateDescriptor.java
@@ -25,14 +25,17 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
-public class SerializableIntermediateSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
+public class SerializableIntermediateSqlAvgAggregateDescriptor
+ extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableIntermediateSqlAvgAggregateDescriptor();
}
@@ -44,14 +47,15 @@ public class SerializableIntermediateSqlAvgAggregateDescriptor extends AbstractS
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableIntermediateSqlAvgAggregateFunction(args);
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableIntermediateSqlAvgAggregateFunction(args, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
index 920dae2..bd58214 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableIntermediateSqlAvgAggregateFunction.java
@@ -22,13 +22,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
import java.io.DataOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableIntermediateSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableIntermediateSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
- super(args);
+ public SerializableIntermediateSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+ throws AlgebricksException {
+ super(args, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
index a29c37a..8fd080a 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateDescriptor.java
@@ -25,14 +25,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalAvgAggregateDescriptor();
}
@@ -44,13 +46,15 @@ public class SerializableLocalAvgAggregateDescriptor extends AbstractSerializabl
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableLocalAvgAggregateFunction(args);
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableLocalAvgAggregateFunction(args, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
index 1b7772f..c15a937 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalAvgAggregateFunction.java
@@ -24,13 +24,15 @@ import java.io.DataOutput;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableLocalAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableLocalAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
- super(args);
+ public SerializableLocalAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+ throws AlgebricksException {
+ super(args, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
index b31e69f..f96a053 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateDescriptor.java
@@ -25,14 +25,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalSqlAvgAggregateDescriptor();
}
@@ -44,13 +46,15 @@ public class SerializableLocalSqlAvgAggregateDescriptor extends AbstractSerializ
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableLocalSqlAvgAggregateFunction(args);
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableLocalSqlAvgAggregateFunction(args, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
index e2f40e5..3609050 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlAvgAggregateFunction.java
@@ -22,13 +22,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
import java.io.DataOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableLocalSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableLocalSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
- super(args);
+ public SerializableLocalSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+ throws AlgebricksException {
+ super(args, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
index 3b8bf0a..4392e6e 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSqlSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalSqlSumAggregateDescriptor();
}
@@ -43,14 +45,15 @@ public class SerializableLocalSqlSumAggregateDescriptor extends AbstractSerializ
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSqlSumAggregateFunction(args, true);
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableSqlSumAggregateFunction(args, true, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
index b2fab9c..e610077 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableLocalSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableLocalSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableLocalSumAggregateDescriptor();
}
@@ -43,14 +45,15 @@ public class SerializableLocalSumAggregateDescriptor extends AbstractSerializabl
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSumAggregateFunction(args, true);
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableSumAggregateFunction(args, true, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
index 55151dc..ef1914a 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableSqlAvgAggregateDescriptor();
}
@@ -43,13 +45,15 @@ public class SerializableSqlAvgAggregateDescriptor extends AbstractSerializableA
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSqlAvgAggregateFunction(args);
+ @Override
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableSqlAvgAggregateFunction(args, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
index e7205ac..8b62efb 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlAvgAggregateFunction.java
@@ -21,13 +21,15 @@ package org.apache.asterix.runtime.aggregates.serializable.std;
import java.io.DataOutput;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SerializableSqlAvgAggregateFunction extends AbstractSerializableAvgAggregateFunction {
- public SerializableSqlAvgAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
- super(args);
+ public SerializableSqlAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+ throws AlgebricksException {
+ super(args, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
index 1998e18..2a5bc53 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateDescriptor.java
@@ -24,9 +24,10 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
/**
* count(NULL) returns NULL.
@@ -35,6 +36,7 @@ public class SerializableSqlCountAggregateDescriptor extends AbstractSerializabl
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableSqlCountAggregateDescriptor();
}
@@ -46,14 +48,15 @@ public class SerializableSqlCountAggregateDescriptor extends AbstractSerializabl
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSqlCountAggregateFunction(args);
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableSqlCountAggregateFunction(args, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
index aaf9df7..24cd674 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlCountAggregateFunction.java
@@ -19,14 +19,16 @@
package org.apache.asterix.runtime.aggregates.serializable.std;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
/**
* count(NULL) returns NULL.
*/
public class SerializableSqlCountAggregateFunction extends AbstractSerializableCountAggregateFunction {
- public SerializableSqlCountAggregateFunction(ICopyEvaluatorFactory[] args) throws AlgebricksException {
- super(args);
+ public SerializableSqlCountAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
+ throws AlgebricksException {
+ super(args, context);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
index 9813ad7..dec2688 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableSqlSumAggregateDescriptor();
}
@@ -43,14 +45,15 @@ public class SerializableSqlSumAggregateDescriptor extends AbstractSerializableA
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSqlSumAggregateFunction(args, false);
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableSqlSumAggregateFunction(args, false, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
index 8d82dde..f34d1be 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSqlSumAggregateFunction.java
@@ -26,14 +26,15 @@ import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableSqlSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
private final boolean isLocalAgg;
- public SerializableSqlSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
- throws AlgebricksException {
- super(args);
+ public SerializableSqlSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
+ IHyracksTaskContext context) throws AlgebricksException {
+ super(args, context);
this.isLocalAgg = isLocalAgg;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
index 74717ed..8a7cdef 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateDescriptor.java
@@ -24,14 +24,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractSerializableAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopySerializableAggregateFunctionFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableSumAggregateDescriptor extends AbstractSerializableAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new SerializableSumAggregateDescriptor();
}
@@ -43,14 +45,15 @@ public class SerializableSumAggregateDescriptor extends AbstractSerializableAggr
}
@Override
- public ICopySerializableAggregateFunctionFactory createSerializableAggregateFunctionFactory(
- final ICopyEvaluatorFactory[] args) throws AlgebricksException {
- return new ICopySerializableAggregateFunctionFactory() {
+ public ISerializedAggregateEvaluatorFactory createSerializableAggregateEvaluatorFactory(
+ final IScalarEvaluatorFactory[] args) throws AlgebricksException {
+ return new ISerializedAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopySerializableAggregateFunction createAggregateFunction() throws AlgebricksException {
- return new SerializableSumAggregateFunction(args, false);
+ public ISerializedAggregateEvaluator createAggregateEvaluator(IHyracksTaskContext ctx)
+ throws AlgebricksException {
+ return new SerializableSumAggregateFunction(args, false, ctx);
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
index ef0dc3b..e5190ae 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/serializable/std/SerializableSumAggregateFunction.java
@@ -27,14 +27,15 @@ import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class SerializableSumAggregateFunction extends AbstractSerializableSumAggregateFunction {
private final boolean isLocalAgg;
- public SerializableSumAggregateFunction(ICopyEvaluatorFactory[] args, boolean isLocalAgg)
- throws AlgebricksException {
- super(args);
+ public SerializableSumAggregateFunction(IScalarEvaluatorFactory[] args, boolean isLocalAgg,
+ IHyracksTaskContext context) throws AlgebricksException {
+ super(args, context);
this.isLocalAgg = isLocalAgg;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
index 37d4b05..a57aacd 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractAvgAggregateFunction.java
@@ -46,37 +46,39 @@ import org.apache.asterix.runtime.evaluators.common.AccessibleByteArrayEval;
import org.apache.asterix.runtime.evaluators.common.ClosedRecordConstructorEvalFactory.ClosedRecordConstructorEval;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.data.std.util.ByteArrayAccessibleOutputStream;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunction {
+public abstract class AbstractAvgAggregateFunction implements IAggregateEvaluator {
private static final int SUM_FIELD_ID = 0;
private static final int COUNT_FIELD_ID = 1;
private final ARecordType recType;
- private DataOutput out;
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval;
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private IPointable inputVal = new VoidPointable();
+ private IScalarEvaluator eval;
protected ATypeTag aggType;
private double sum;
private long count;
private AMutableDouble aDouble = new AMutableDouble(0);
private AMutableInt64 aInt64 = new AMutableInt64(0);
- private ArrayBackedValueStorage avgBytes = new ArrayBackedValueStorage();
+ private IPointable avgBytes = new VoidPointable();
private ByteArrayAccessibleOutputStream sumBytes = new ByteArrayAccessibleOutputStream();
private DataOutput sumBytesOutput = new DataOutputStream(sumBytes);
private ByteArrayAccessibleOutputStream countBytes = new ByteArrayAccessibleOutputStream();
private DataOutput countBytesOutput = new DataOutputStream(countBytes);
- private ICopyEvaluator evalSum = new AccessibleByteArrayEval(avgBytes.getDataOutput(), sumBytes);
- private ICopyEvaluator evalCount = new AccessibleByteArrayEval(avgBytes.getDataOutput(), countBytes);
+ private IScalarEvaluator evalSum = new AccessibleByteArrayEval(sumBytes);
+ private IScalarEvaluator evalCount = new AccessibleByteArrayEval(countBytes);
private ClosedRecordConstructorEval recordEval;
@SuppressWarnings("unchecked")
@@ -89,15 +91,13 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
private ISerializerDeserializer<ANull> nullSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ANULL);
- public AbstractAvgAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ public AbstractAvgAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
throws AlgebricksException {
- eval = args[0].createEvaluator(inputVal);
- out = output.getDataOutput();
+ eval = args[0].createScalarEvaluator(context);
recType = new ARecordType(null, new String[] { "sum", "count" },
new IAType[] { BuiltinType.ADOUBLE, BuiltinType.AINT64 }, false);
- recordEval = new ClosedRecordConstructorEval(recType, new ICopyEvaluator[] { evalSum, evalCount }, avgBytes,
- out);
+ recordEval = new ClosedRecordConstructorEval(recType, new IScalarEvaluator[] { evalSum, evalCount });
}
@Override
@@ -111,10 +111,10 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
public abstract void step(IFrameTupleReference tuple) throws AlgebricksException;
@Override
- public abstract void finish() throws AlgebricksException;
+ public abstract void finish(IPointable result) throws AlgebricksException;
@Override
- public abstract void finishPartial() throws AlgebricksException;
+ public abstract void finishPartial(IPointable result) throws AlgebricksException;
protected abstract void processNull();
@@ -122,9 +122,11 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
if (skipStep()) {
return;
}
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ eval.evaluate(tuple, inputVal);
+ byte[] data = inputVal.getByteArray();
+ int offset = inputVal.getStartOffset();
+
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]);
if (typeTag == ATypeTag.NULL) {
processNull();
return;
@@ -139,32 +141,32 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
++count;
switch (typeTag) {
case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ byte val = AInt8SerializerDeserializer.getByte(data, offset + 1);
sum += val;
break;
}
case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ short val = AInt16SerializerDeserializer.getShort(data, offset + 1);
sum += val;
break;
}
case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ int val = AInt32SerializerDeserializer.getInt(data, offset + 1);
sum += val;
break;
}
case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ long val = AInt64SerializerDeserializer.getLong(data, offset + 1);
sum += val;
break;
}
case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ float val = AFloatSerializerDeserializer.getFloat(data, offset + 1);
sum += val;
break;
}
case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ double val = ADoubleSerializerDeserializer.getDouble(data, offset + 1);
sum += val;
break;
}
@@ -172,19 +174,21 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
throw new NotImplementedException("Cannot compute AVG for values of type " + typeTag);
}
}
- inputVal.reset();
}
- protected void finishPartialResults() throws AlgebricksException {
+ protected void finishPartialResults(IPointable result) throws AlgebricksException {
+ resultStorage.reset();
try {
// Double check that count 0 is accounted
if (aggType == ATypeTag.SYSTEM_NULL) {
if (GlobalConfig.DEBUG) {
GlobalConfig.ASTERIX_LOGGER.finest("AVG aggregate ran over empty input.");
}
- out.writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_SYSTEM_NULL_TYPE_TAG);
+ result.set(resultStorage);
} else if (aggType == ATypeTag.NULL) {
- out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
} else {
sumBytes.reset();
aDouble.setValue(sum);
@@ -192,7 +196,8 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
countBytes.reset();
aInt64.setValue(count);
longSerde.serialize(aInt64, countBytesOutput);
- recordEval.evaluate(null);
+ recordEval.evaluate(null, avgBytes);
+ result.set(avgBytes);
}
} catch (IOException e) {
throw new AlgebricksException(e);
@@ -203,10 +208,10 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
if (skipStep()) {
return;
}
- inputVal.reset();
- eval.evaluate(tuple);
+ eval.evaluate(tuple, inputVal);
byte[] serBytes = inputVal.getByteArray();
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[0]);
+ int offset = inputVal.getStartOffset();
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(serBytes[offset]);
switch (typeTag) {
case NULL: {
processNull();
@@ -220,11 +225,11 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
// Expected.
aggType = ATypeTag.DOUBLE;
int nullBitmapSize = 0;
- int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, SUM_FIELD_ID, nullBitmapSize,
- false);
+ int offset1 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, SUM_FIELD_ID,
+ nullBitmapSize, false);
sum += ADoubleSerializerDeserializer.getDouble(serBytes, offset1);
- int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, COUNT_FIELD_ID, nullBitmapSize,
- false);
+ int offset2 = ARecordSerializerDeserializer.getFieldOffsetById(serBytes, offset, COUNT_FIELD_ID,
+ nullBitmapSize, false);
count += AInt64SerializerDeserializer.getLong(serBytes, offset2);
break;
}
@@ -235,17 +240,19 @@ public abstract class AbstractAvgAggregateFunction implements ICopyAggregateFunc
}
}
- protected void finishFinalResults() throws AlgebricksException {
+ protected void finishFinalResults(IPointable result) throws AlgebricksException {
+ resultStorage.reset();
try {
if (count == 0 || aggType == ATypeTag.NULL) {
- nullSerde.serialize(ANull.NULL, out);
+ nullSerde.serialize(ANull.NULL, resultStorage.getDataOutput());
} else {
aDouble.setValue(sum / count);
- doubleSerde.serialize(aDouble, out);
+ doubleSerde.serialize(aDouble, resultStorage.getDataOutput());
}
} catch (IOException e) {
throw new AlgebricksException(e);
}
+ result.set(resultStorage);
}
protected boolean skipStep() {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
index 9523d90..f93617d 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractCountAggregateFunction.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
import java.io.IOException;
import org.apache.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
@@ -28,31 +27,33 @@ import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
/**
* COUNT returns the number of items in the given list. Note that COUNT(NULL) is not allowed.
*/
-public abstract class AbstractCountAggregateFunction implements ICopyAggregateFunction {
+public abstract class AbstractCountAggregateFunction implements IAggregateEvaluator {
private AMutableInt64 result = new AMutableInt64(-1);
@SuppressWarnings("unchecked")
private ISerializerDeserializer<AInt64> int64Serde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.AINT64);
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval;
+ private IPointable inputVal = new VoidPointable();
+ private IScalarEvaluator eval;
protected long cnt;
- private DataOutput out;
- public AbstractCountAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
+ private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+
+ public AbstractCountAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
throws AlgebricksException {
- eval = args[0].createEvaluator(inputVal);
- out = output.getDataOutput();
+ eval = args[0].createScalarEvaluator(context);
}
@Override
@@ -62,9 +63,9 @@ public abstract class AbstractCountAggregateFunction implements ICopyAggregateFu
@Override
public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ eval.evaluate(tuple, inputVal);
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[inputVal.getStartOffset()]);
// Ignore SYSTEM_NULL.
if (typeTag == ATypeTag.NULL) {
processNull();
@@ -74,18 +75,20 @@ public abstract class AbstractCountAggregateFunction implements ICopyAggregateFu
}
@Override
- public void finish() throws AlgebricksException {
+ public void finish(IPointable resultPointable) throws AlgebricksException {
+ resultStorage.reset();
try {
result.setValue(cnt);
- int64Serde.serialize(result, out);
+ int64Serde.serialize(result, resultStorage.getDataOutput());
} catch (IOException e) {
throw new AlgebricksException(e);
}
+ resultPointable.set(resultStorage);
}
@Override
- public void finishPartial() throws AlgebricksException {
- finish();
+ public void finishPartial(IPointable resultPointable) throws AlgebricksException {
+ finish(resultPointable);
}
protected abstract void processNull();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index e934ddb..c5a0104 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
import java.io.IOException;
import org.apache.asterix.formats.nontagged.AqlBinaryComparatorFactoryProvider;
@@ -27,46 +26,48 @@ import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
import org.apache.asterix.om.types.hierachy.ITypeConvertComputer;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateFunction {
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
+public abstract class AbstractMinMaxAggregateFunction implements IAggregateEvaluator {
+ private IPointable inputVal = new VoidPointable();
private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
private ArrayBackedValueStorage tempValForCasting = new ArrayBackedValueStorage();
- protected DataOutput out;
- private ICopyEvaluator eval;
+
+ protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private IScalarEvaluator eval;
protected ATypeTag aggType;
private IBinaryComparator cmp;
private ITypeConvertComputer tpc;
private final boolean isMin;
- public AbstractMinMaxAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider, boolean isMin)
+ public AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context, boolean isMin)
throws AlgebricksException {
- out = provider.getDataOutput();
- eval = args[0].createEvaluator(inputVal);
+ eval = args[0].createScalarEvaluator(context);
this.isMin = isMin;
}
@Override
public void init() {
aggType = ATypeTag.SYSTEM_NULL;
- outputVal.reset();
tempValForCasting.reset();
}
@Override
public void step(IFrameTupleReference tuple) throws AlgebricksException {
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ eval.evaluate(tuple, inputVal);
+
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+ .deserialize(inputVal.getByteArray()[inputVal.getStartOffset()]);
if (typeTag == ATypeTag.NULL) {
processNull();
return;
@@ -110,7 +111,6 @@ public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateF
} catch (IOException e) {
throw new AlgebricksException(e);
}
- outputVal.reset();
outputVal.assign(tempValForCasting);
}
try {
@@ -157,19 +157,22 @@ public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateF
}
@Override
- public void finish() throws AlgebricksException {
+ public void finish(IPointable result) throws AlgebricksException {
+ resultStorage.reset();
try {
switch (aggType) {
case NULL: {
- out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+ result.set(resultStorage);
break;
}
case SYSTEM_NULL: {
finishSystemNull();
+ result.set(resultStorage);
break;
}
default: {
- out.write(outputVal.getByteArray(), outputVal.getStartOffset(), outputVal.getLength());
+ result.set(outputVal);
break;
}
}
@@ -179,8 +182,8 @@ public abstract class AbstractMinMaxAggregateFunction implements ICopyAggregateF
}
@Override
- public void finishPartial() throws AlgebricksException {
- finish();
+ public void finishPartial(IPointable result) throws AlgebricksException {
+ finish(result);
}
protected boolean skipStep() {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
index fcf1850..2948887 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractSumAggregateFunction.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.runtime.aggregates.std;
-import java.io.DataOutput;
import java.io.IOException;
import org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
@@ -41,18 +40,20 @@ import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.hierachy.ATypeHierarchy;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunction {
- protected DataOutput out;
- private ArrayBackedValueStorage inputVal = new ArrayBackedValueStorage();
- private ICopyEvaluator eval;
+public abstract class AbstractSumAggregateFunction implements IAggregateEvaluator {
+ protected ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage();
+ private IPointable inputVal = new VoidPointable();
+ private IScalarEvaluator eval;
private double sum;
protected ATypeTag aggType;
private AMutableDouble aDouble = new AMutableDouble(0);
@@ -64,10 +65,9 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
@SuppressWarnings("rawtypes")
protected ISerializerDeserializer serde;
- public AbstractSumAggregateFunction(ICopyEvaluatorFactory[] args, IDataOutputProvider provider)
+ public AbstractSumAggregateFunction(IScalarEvaluatorFactory[] args, IHyracksTaskContext context)
throws AlgebricksException {
- out = provider.getDataOutput();
- eval = args[0].createEvaluator(inputVal);
+ eval = args[0].createScalarEvaluator(context);
}
@Override
@@ -81,9 +81,11 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
if (skipStep()) {
return;
}
- inputVal.reset();
- eval.evaluate(tuple);
- ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(inputVal.getByteArray()[0]);
+ eval.evaluate(tuple, inputVal);
+ byte[] data = inputVal.getByteArray();
+ int offset = inputVal.getStartOffset();
+
+ ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(data[offset]);
if (typeTag == ATypeTag.NULL) {
processNull();
return;
@@ -100,32 +102,32 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
switch (typeTag) {
case INT8: {
- byte val = AInt8SerializerDeserializer.getByte(inputVal.getByteArray(), 1);
+ byte val = AInt8SerializerDeserializer.getByte(data, offset + 1);
sum += val;
break;
}
case INT16: {
- short val = AInt16SerializerDeserializer.getShort(inputVal.getByteArray(), 1);
+ short val = AInt16SerializerDeserializer.getShort(data, offset + 1);
sum += val;
break;
}
case INT32: {
- int val = AInt32SerializerDeserializer.getInt(inputVal.getByteArray(), 1);
+ int val = AInt32SerializerDeserializer.getInt(data, offset + 1);
sum += val;
break;
}
case INT64: {
- long val = AInt64SerializerDeserializer.getLong(inputVal.getByteArray(), 1);
+ long val = AInt64SerializerDeserializer.getLong(data, offset + 1);
sum += val;
break;
}
case FLOAT: {
- float val = AFloatSerializerDeserializer.getFloat(inputVal.getByteArray(), 1);
+ float val = AFloatSerializerDeserializer.getFloat(data, offset + 1);
sum += val;
break;
}
case DOUBLE: {
- double val = ADoubleSerializerDeserializer.getDouble(inputVal.getByteArray(), 1);
+ double val = ADoubleSerializerDeserializer.getDouble(data, offset + 1);
sum += val;
break;
}
@@ -144,48 +146,49 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
@SuppressWarnings("unchecked")
@Override
- public void finish() throws AlgebricksException {
+ public void finish(IPointable result) throws AlgebricksException {
+ resultStorage.reset();
try {
switch (aggType) {
case INT8: {
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT8);
aInt8.setValue((byte) sum);
- serde.serialize(aInt8, out);
+ serde.serialize(aInt8, resultStorage.getDataOutput());
break;
}
case INT16: {
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT16);
aInt16.setValue((short) sum);
- serde.serialize(aInt16, out);
+ serde.serialize(aInt16, resultStorage.getDataOutput());
break;
}
case INT32: {
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT32);
aInt32.setValue((int) sum);
- serde.serialize(aInt32, out);
+ serde.serialize(aInt32, resultStorage.getDataOutput());
break;
}
case INT64: {
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AINT64);
aInt64.setValue((long) sum);
- serde.serialize(aInt64, out);
+ serde.serialize(aInt64, resultStorage.getDataOutput());
break;
}
case FLOAT: {
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AFLOAT);
aFloat.setValue((float) sum);
- serde.serialize(aFloat, out);
+ serde.serialize(aFloat, resultStorage.getDataOutput());
break;
}
case DOUBLE: {
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADOUBLE);
aDouble.setValue(sum);
- serde.serialize(aDouble, out);
+ serde.serialize(aDouble, resultStorage.getDataOutput());
break;
}
case NULL: {
serde = AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL);
- serde.serialize(ANull.NULL, out);
+ serde.serialize(ANull.NULL, resultStorage.getDataOutput());
break;
}
case SYSTEM_NULL: {
@@ -193,23 +196,27 @@ public abstract class AbstractSumAggregateFunction implements ICopyAggregateFunc
break;
}
default:
- throw new AlgebricksException("SumAggregationFunction: incompatible type for the result ("
- + aggType + "). ");
+ throw new AlgebricksException(
+ "SumAggregationFunction: incompatible type for the result (" + aggType + "). ");
}
} catch (IOException e) {
throw new AlgebricksException(e);
}
+ result.set(resultStorage);
}
@Override
- public void finishPartial() throws AlgebricksException {
- finish();
+ public void finishPartial(IPointable result) throws AlgebricksException {
+ finish(result);
}
protected boolean skipStep() {
return false;
}
+
protected abstract void processNull();
+
protected abstract void processSystemNull() throws AlgebricksException;
+
protected abstract void finishSystemNull() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/0ae30836/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
----------------------------------------------------------------------
diff --git a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
index 0c24ef1..2b5c9bb 100644
--- a/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
+++ b/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AvgAggregateDescriptor.java
@@ -25,15 +25,16 @@ import org.apache.asterix.om.functions.IFunctionDescriptorFactory;
import org.apache.asterix.runtime.aggregates.base.AbstractAggregateFunctionDynamicDescriptor;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunction;
-import org.apache.hyracks.algebricks.runtime.base.ICopyAggregateFunctionFactory;
-import org.apache.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
-import org.apache.hyracks.data.std.api.IDataOutputProvider;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
public class AvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDescriptor {
private static final long serialVersionUID = 1L;
public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() {
+ @Override
public IFunctionDescriptor createFunctionDescriptor() {
return new AvgAggregateDescriptor();
}
@@ -45,15 +46,15 @@ public class AvgAggregateDescriptor extends AbstractAggregateFunctionDynamicDesc
}
@Override
- public ICopyAggregateFunctionFactory createAggregateFunctionFactory(final ICopyEvaluatorFactory[] args)
+ public IAggregateEvaluatorFactory createAggregateEvaluatorFactory(final IScalarEvaluatorFactory[] args)
throws AlgebricksException {
- return new ICopyAggregateFunctionFactory() {
+ return new IAggregateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
- public ICopyAggregateFunction createAggregateFunction(final IDataOutputProvider provider)
+ public IAggregateEvaluator createAggregateEvaluator(final IHyracksTaskContext ctx)
throws AlgebricksException {
- return new AvgAggregateFunction(args, provider);
+ return new AvgAggregateFunction(args, ctx);
}
};
}