You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:12:15 UTC
[35/50] [abbrv] git commit: Added
Added
Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/99a4555a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/99a4555a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/99a4555a
Branch: refs/heads/prestonc/hash_join
Commit: 99a4555a505c21b767aa0476ef448f58ecc6581e
Parents: 32f6b97
Author: Preston Carman <pr...@apache.org>
Authored: Mon Mar 17 18:10:42 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:25 2014 -0700
----------------------------------------------------------------------
.../rules/IntroduceTwoStepAggregateRule.java | 58 +++++----
.../vxquery/functions/builtin-operators.xml | 14 +++
.../AvgGlobalAggregateEvaluatorFactory.java | 117 +++++++++++++++++++
.../AvgLocalAggregateEvaluatorFactory.java | 104 +++++++++++++++++
4 files changed, 263 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
index 29cf34f..d6f302c 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
@@ -16,10 +16,15 @@
*/
package org.apache.vxquery.compiler.rewriter.rules;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.functions.BuiltinOperators;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -27,6 +32,8 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -50,6 +57,9 @@ import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
* if (af1 == count) aggregate operating settings:
* Step 1: count
* Step 2: sum
+ * if (af1 == avg) aggregate operating settings:
+ * Step 1: avg-local
+ * Step 2: avg-global
* if (af1 in (max, min, sum)) aggregate operating settings:
* Step 1: af1
* Step 2: af1
@@ -58,6 +68,21 @@ import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
* @author prestonc
*/
public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule {
+ final Map<FunctionIdentifier, Pair<IFunctionInfo, IFunctionInfo>> AGGREGATE_MAP = new HashMap<FunctionIdentifier, Pair<IFunctionInfo, IFunctionInfo>>();
+
+ public IntroduceTwoStepAggregateRule() {
+ AGGREGATE_MAP.put(BuiltinFunctions.FN_AVG_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+ BuiltinOperators.AVG_LOCAL, BuiltinOperators.AVG_GLOBAL));
+ AGGREGATE_MAP.put(BuiltinFunctions.FN_COUNT_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+ BuiltinFunctions.FN_COUNT_1, BuiltinFunctions.FN_SUM_1));
+ AGGREGATE_MAP.put(BuiltinFunctions.FN_MAX_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+ BuiltinFunctions.FN_MAX_1, BuiltinFunctions.FN_MAX_1));
+ AGGREGATE_MAP.put(BuiltinFunctions.FN_MIN_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+ BuiltinFunctions.FN_MIN_1, BuiltinFunctions.FN_MIN_1));
+ AGGREGATE_MAP.put(BuiltinFunctions.FN_SUM_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+ BuiltinFunctions.FN_SUM_1, BuiltinFunctions.FN_SUM_1));
+ }
+
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
// Check if aggregate function.
@@ -74,41 +99,14 @@ public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule {
}
AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;
- if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_COUNT_1.getFunctionIdentifier())) {
- AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
- if (aggregateFunctionCall.isTwoStep()) {
- return false;
- }
- aggregateFunctionCall.setTwoStep(true);
- aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_COUNT_1);
- aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_SUM_1);
- return true;
- } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_MAX_1.getFunctionIdentifier())) {
- AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
- if (aggregateFunctionCall.isTwoStep()) {
- return false;
- }
- aggregateFunctionCall.setTwoStep(true);
- aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_MAX_1);
- aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_MAX_1);
- return true;
- } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_MIN_1.getFunctionIdentifier())) {
- AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
- if (aggregateFunctionCall.isTwoStep()) {
- return false;
- }
- aggregateFunctionCall.setTwoStep(true);
- aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_MIN_1);
- aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_MIN_1);
- return true;
- } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_SUM_1.getFunctionIdentifier())) {
+ if (AGGREGATE_MAP.containsKey(functionCall.getFunctionIdentifier())) {
AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
if (aggregateFunctionCall.isTwoStep()) {
return false;
}
aggregateFunctionCall.setTwoStep(true);
- aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_SUM_1);
- aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_SUM_1);
+ aggregateFunctionCall.setStepOneAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).first);
+ aggregateFunctionCall.setStepTwoAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).second);
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
index 6a6c28d..ecf7542 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
@@ -515,6 +515,20 @@
<return type="xs:boolean"/>
</operator>
+ <!-- fn:avg-local($arg as xs:anyAtomicType*) as xs:anyAtomicType? -->
+ <operator name="opext:avg-local">
+ <param name="arg" type="xs:anyAtomicType*"/>
+ <return type="xs:anyAtomicType?"/>
+ <runtime type="aggregate" class="org.apache.vxquery.runtime.functions.aggregate.AvgLocalAggregateEvaluatorFactory"/>
+ </operator>
+
+ <!-- fn:avg-global($arg as xs:anyAtomicType*) as xs:anyAtomicType? -->
+ <operator name="opext:avg-global">
+ <param name="arg" type="xs:anyAtomicType*"/>
+ <return type="xs:anyAtomicType?"/>
+ <runtime type="aggregate" class="org.apache.vxquery.runtime.functions.aggregate.AvgGlobalAggregateEvaluatorFactory"/>
+ </operator>
+
<!-- opext:ordered($arg as item()*) as item()* -->
<operator name="opext:ordered">
<param name="arg" type="item()*"/>
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..61ae69e
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.aggregate;
+
+import java.io.DataOutput;
+
+import org.apache.vxquery.datamodel.accessors.SequencePointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.arithmetic.AddOperation;
+import org.apache.vxquery.runtime.functions.arithmetic.DivideOperation;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluator;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluatorFactory;
+import org.apache.vxquery.runtime.functions.util.FunctionHelper;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class AvgGlobalAggregateEvaluatorFactory extends AbstractTaggedValueArgumentAggregateEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public AvgGlobalAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ super(args);
+ }
+
+ @Override
+ protected IAggregateEvaluator createEvaluator(IScalarEvaluator[] args) throws AlgebricksException {
+ final ArrayBackedValueStorage abvsCount = new ArrayBackedValueStorage();
+ final DataOutput dOutCount = abvsCount.getDataOutput();
+ final ArrayBackedValueStorage abvsSum = new ArrayBackedValueStorage();
+ final DataOutput dOutSum = abvsSum.getDataOutput();
+ final AddOperation aOp = new AddOperation();
+ final DivideOperation aOpDivide = new DivideOperation();
+ final LongPointable longp = (LongPointable) LongPointable.FACTORY.createPointable();
+ final SequencePointable seq = (SequencePointable) SequencePointable.FACTORY.createPointable();
+ final TaggedValuePointable tvpArg = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+
+ return new AbstractTaggedValueArgumentAggregateEvaluator(args) {
+ TaggedValuePointable tvpSum = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ TaggedValuePointable tvpCount = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+
+ @Override
+ public void init() throws AlgebricksException {
+ try {
+ abvsCount.reset();
+ dOutCount.write(ValueTag.XS_INTEGER_TAG);
+ dOutCount.writeLong(0);
+ tvpCount.set(abvsCount);
+ abvsSum.reset();
+ dOutSum.write(ValueTag.XS_INTEGER_TAG);
+ dOutSum.writeLong(0);
+ tvpSum.set(abvsSum);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+
+ @Override
+ public void finish(IPointable result) throws AlgebricksException {
+ tvpCount.getValue(longp);
+ if (longp.getLong() == 0) {
+ XDMConstants.setEmptySequence(result);
+ } else {
+ // Set count as a TaggedValuePointable.
+ try {
+ FunctionHelper.arithmeticOperation(aOpDivide, dCtx, tvpSum, tvpCount, tvpSum);
+ result.set(tvpSum);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
+
+ @Override
+ protected void step(TaggedValuePointable[] args) throws SystemException {
+ TaggedValuePointable tvp = args[0];
+ if (tvp.getTag() == ValueTag.SEQUENCE_TAG) {
+ tvp.getValue(seq);
+ int seqLen = seq.getEntryCount();
+ if (seqLen == 0) {
+ // No results from nodes.
+ return;
+ } else if (seqLen == 2) {
+ seq.getEntry(0, tvpArg);
+ FunctionHelper.arithmeticOperation(aOp, dCtx, tvpArg, tvpCount, tvpCount);
+ seq.getEntry(1, tvpArg);
+ FunctionHelper.arithmeticOperation(aOp, dCtx, tvpArg, tvpSum, tvpSum);
+ } else {
+ throw new SystemException(ErrorCode.SYSE0001);
+ }
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..91657c6
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.aggregate;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.arithmetic.AddOperation;
+import org.apache.vxquery.runtime.functions.arithmetic.DivideOperation;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluator;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluatorFactory;
+import org.apache.vxquery.runtime.functions.util.FunctionHelper;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class AvgLocalAggregateEvaluatorFactory extends AbstractTaggedValueArgumentAggregateEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+
+ public AvgLocalAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+ super(args);
+ }
+
+ @Override
+ protected IAggregateEvaluator createEvaluator(IScalarEvaluator[] args) throws AlgebricksException {
+ final TaggedValuePointable tvpCount = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+ final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+ final ArrayBackedValueStorage abvsSeq = new ArrayBackedValueStorage();
+ final SequenceBuilder sb = new SequenceBuilder();
+ final DataOutput dOut = abvs.getDataOutput();
+ final AddOperation aOp = new AddOperation();
+
+ return new AbstractTaggedValueArgumentAggregateEvaluator(args) {
+ long count;
+ TaggedValuePointable tvpSum = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+
+ @Override
+ public void init() throws AlgebricksException {
+ count = 0;
+ }
+
+ @Override
+ public void finish(IPointable result) throws AlgebricksException {
+ if (count == 0) {
+ XDMConstants.setEmptySequence(result);
+ } else {
+ // Set count as a TaggedValuePointable.
+ try {
+ abvs.reset();
+ dOut.write(ValueTag.XS_INTEGER_TAG);
+ dOut.writeLong(count);
+ tvpCount.set(abvs);
+
+ // Save intermediate result.
+ abvsSeq.reset();
+ sb.reset(abvsSeq);
+ sb.addItem(tvpCount);
+ sb.addItem(tvpSum);
+ sb.finish();
+ result.set(abvsSeq);
+ } catch (Exception e) {
+ throw new AlgebricksException(e);
+ }
+ }
+ }
+
+ @Override
+ protected void step(TaggedValuePointable[] args) throws SystemException {
+ TaggedValuePointable tvp = args[0];
+ if (count == 0) {
+ // Init.
+ tvpSum.set(tvp);
+ } else {
+ FunctionHelper.arithmeticOperation(aOp, dCtx, tvp, tvpSum, tvpSum);
+ }
+ count++;
+ }
+ };
+ }
+}
\ No newline at end of file