You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@vxquery.apache.org by pr...@apache.org on 2014/04/02 06:12:15 UTC

[35/50] [abbrv] git commit: Added

Added


Project: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/commit/99a4555a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/tree/99a4555a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-vxquery/diff/99a4555a

Branch: refs/heads/prestonc/hash_join
Commit: 99a4555a505c21b767aa0476ef448f58ecc6581e
Parents: 32f6b97
Author: Preston Carman <pr...@apache.org>
Authored: Mon Mar 17 18:10:42 2014 -0700
Committer: Preston Carman <pr...@apache.org>
Committed: Tue Apr 1 20:56:25 2014 -0700

----------------------------------------------------------------------
 .../rules/IntroduceTwoStepAggregateRule.java    |  58 +++++----
 .../vxquery/functions/builtin-operators.xml     |  14 +++
 .../AvgGlobalAggregateEvaluatorFactory.java     | 117 +++++++++++++++++++
 .../AvgLocalAggregateEvaluatorFactory.java      | 104 +++++++++++++++++
 4 files changed, 263 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
index 29cf34f..d6f302c 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
+++ b/vxquery-core/src/main/java/org/apache/vxquery/compiler/rewriter/rules/IntroduceTwoStepAggregateRule.java
@@ -16,10 +16,15 @@
  */
 package org.apache.vxquery.compiler.rewriter.rules;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.vxquery.functions.BuiltinFunctions;
+import org.apache.vxquery.functions.BuiltinOperators;
 
 import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.common.utils.Pair;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
@@ -27,6 +32,8 @@ import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
 import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AggregateFunctionCallExpression;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import edu.uci.ics.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
 import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
@@ -50,6 +57,9 @@ import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
  *   if (af1 == count) aggregate operating settings:
  *     Step 1: count
  *     Step 2: sum
+ *   if (af1 == avg) aggregate operating settings:
+ *     Step 1: avg-local
+ *     Step 2: avg-global
  *   if (af1 in (max, min, sum)) aggregate operating settings:
  *     Step 1: af1
  *     Step 2: af1
@@ -58,6 +68,21 @@ import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
  * @author prestonc
  */
 public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule {
+    final Map<FunctionIdentifier, Pair<IFunctionInfo, IFunctionInfo>> AGGREGATE_MAP = new HashMap<FunctionIdentifier, Pair<IFunctionInfo, IFunctionInfo>>();
+
+    public IntroduceTwoStepAggregateRule() {
+        AGGREGATE_MAP.put(BuiltinFunctions.FN_AVG_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+                BuiltinOperators.AVG_LOCAL, BuiltinOperators.AVG_GLOBAL));
+        AGGREGATE_MAP.put(BuiltinFunctions.FN_COUNT_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+                BuiltinFunctions.FN_COUNT_1, BuiltinFunctions.FN_SUM_1));
+        AGGREGATE_MAP.put(BuiltinFunctions.FN_MAX_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+                BuiltinFunctions.FN_MAX_1, BuiltinFunctions.FN_MAX_1));
+        AGGREGATE_MAP.put(BuiltinFunctions.FN_MIN_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+                BuiltinFunctions.FN_MIN_1, BuiltinFunctions.FN_MIN_1));
+        AGGREGATE_MAP.put(BuiltinFunctions.FN_SUM_1.getFunctionIdentifier(), new Pair<IFunctionInfo, IFunctionInfo>(
+                BuiltinFunctions.FN_SUM_1, BuiltinFunctions.FN_SUM_1));
+    }
+
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
         // Check if aggregate function.
@@ -74,41 +99,14 @@ public class IntroduceTwoStepAggregateRule implements IAlgebraicRewriteRule {
         }
         AbstractFunctionCallExpression functionCall = (AbstractFunctionCallExpression) logicalExpression;
 
-        if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_COUNT_1.getFunctionIdentifier())) {
-            AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
-            if (aggregateFunctionCall.isTwoStep()) {
-                return false;
-            }
-            aggregateFunctionCall.setTwoStep(true);
-            aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_COUNT_1);
-            aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_SUM_1);
-            return true;
-        } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_MAX_1.getFunctionIdentifier())) {
-            AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
-            if (aggregateFunctionCall.isTwoStep()) {
-                return false;
-            }
-            aggregateFunctionCall.setTwoStep(true);
-            aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_MAX_1);
-            aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_MAX_1);
-            return true;
-        } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_MIN_1.getFunctionIdentifier())) {
-            AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
-            if (aggregateFunctionCall.isTwoStep()) {
-                return false;
-            }
-            aggregateFunctionCall.setTwoStep(true);
-            aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_MIN_1);
-            aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_MIN_1);
-            return true;
-        } else if (functionCall.getFunctionIdentifier().equals(BuiltinFunctions.FN_SUM_1.getFunctionIdentifier())) {
+        if (AGGREGATE_MAP.containsKey(functionCall.getFunctionIdentifier())) {
             AggregateFunctionCallExpression aggregateFunctionCall = (AggregateFunctionCallExpression) functionCall;
             if (aggregateFunctionCall.isTwoStep()) {
                 return false;
             }
             aggregateFunctionCall.setTwoStep(true);
-            aggregateFunctionCall.setStepOneAggregate(BuiltinFunctions.FN_SUM_1);
-            aggregateFunctionCall.setStepTwoAggregate(BuiltinFunctions.FN_SUM_1);
+            aggregateFunctionCall.setStepOneAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).first);
+            aggregateFunctionCall.setStepTwoAggregate(AGGREGATE_MAP.get(functionCall.getFunctionIdentifier()).second);
             return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
index 6a6c28d..ecf7542 100644
--- a/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
+++ b/vxquery-core/src/main/java/org/apache/vxquery/functions/builtin-operators.xml
@@ -515,6 +515,20 @@
         <return type="xs:boolean"/>
     </operator>
 
+    <!-- fn:avg-local($arg as xs:anyAtomicType*) as xs:anyAtomicType? -->
+    <operator name="opext:avg-local">
+        <param name="arg" type="xs:anyAtomicType*"/>
+        <return type="xs:anyAtomicType?"/>
+        <runtime type="aggregate" class="org.apache.vxquery.runtime.functions.aggregate.AvgLocalAggregateEvaluatorFactory"/>
+    </operator>
+
+    <!-- fn:avg-global($arg as xs:anyAtomicType*) as xs:anyAtomicType? -->
+    <operator name="opext:avg-global">
+        <param name="arg" type="xs:anyAtomicType*"/>
+        <return type="xs:anyAtomicType?"/>
+        <runtime type="aggregate" class="org.apache.vxquery.runtime.functions.aggregate.AvgGlobalAggregateEvaluatorFactory"/>
+    </operator>
+
     <!-- opext:ordered($arg as item()*) as item()* -->
     <operator name="opext:ordered">
         <param name="arg" type="item()*"/>

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..61ae69e
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgGlobalAggregateEvaluatorFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.aggregate;
+
+import java.io.DataOutput;
+
+import org.apache.vxquery.datamodel.accessors.SequencePointable;
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.arithmetic.AddOperation;
+import org.apache.vxquery.runtime.functions.arithmetic.DivideOperation;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluator;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluatorFactory;
+import org.apache.vxquery.runtime.functions.util.FunctionHelper;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.primitive.LongPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class AvgGlobalAggregateEvaluatorFactory extends AbstractTaggedValueArgumentAggregateEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    public AvgGlobalAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        super(args);
+    }
+
+    @Override
+    protected IAggregateEvaluator createEvaluator(IScalarEvaluator[] args) throws AlgebricksException {
+        final ArrayBackedValueStorage abvsCount = new ArrayBackedValueStorage();
+        final DataOutput dOutCount = abvsCount.getDataOutput();
+        final ArrayBackedValueStorage abvsSum = new ArrayBackedValueStorage();
+        final DataOutput dOutSum = abvsSum.getDataOutput();
+        final AddOperation aOp = new AddOperation();
+        final DivideOperation aOpDivide = new DivideOperation();
+        final LongPointable longp = (LongPointable) LongPointable.FACTORY.createPointable();
+        final SequencePointable seq = (SequencePointable) SequencePointable.FACTORY.createPointable();
+        final TaggedValuePointable tvpArg = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+
+        return new AbstractTaggedValueArgumentAggregateEvaluator(args) {
+            TaggedValuePointable tvpSum = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+            TaggedValuePointable tvpCount = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+
+            @Override
+            public void init() throws AlgebricksException {
+                try {
+                    abvsCount.reset();
+                    dOutCount.write(ValueTag.XS_INTEGER_TAG);
+                    dOutCount.writeLong(0);
+                    tvpCount.set(abvsCount);
+                    abvsSum.reset();
+                    dOutSum.write(ValueTag.XS_INTEGER_TAG);
+                    dOutSum.writeLong(0);
+                    tvpSum.set(abvsSum);
+                } catch (Exception e) {
+                    throw new AlgebricksException(e);
+                }
+            }
+
+            @Override
+            public void finish(IPointable result) throws AlgebricksException {
+                tvpCount.getValue(longp);
+                if (longp.getLong() == 0) {
+                    XDMConstants.setEmptySequence(result);
+                } else {
+                    // Set count as a TaggedValuePointable.
+                    try {
+                        FunctionHelper.arithmeticOperation(aOpDivide, dCtx, tvpSum, tvpCount, tvpSum);
+                        result.set(tvpSum);
+                    } catch (Exception e) {
+                        throw new AlgebricksException(e);
+                    }
+                }
+            }
+
+            @Override
+            protected void step(TaggedValuePointable[] args) throws SystemException {
+                TaggedValuePointable tvp = args[0];
+                if (tvp.getTag() == ValueTag.SEQUENCE_TAG) {
+                    tvp.getValue(seq);
+                    int seqLen = seq.getEntryCount();
+                    if (seqLen == 0) {
+                        // No results from nodes.
+                        return;
+                    } else if (seqLen == 2) {
+                        seq.getEntry(0, tvpArg);
+                        FunctionHelper.arithmeticOperation(aOp, dCtx, tvpArg, tvpCount, tvpCount);
+                        seq.getEntry(1, tvpArg);
+                        FunctionHelper.arithmeticOperation(aOp, dCtx, tvpArg, tvpSum, tvpSum);
+                    } else {
+                        throw new SystemException(ErrorCode.SYSE0001);
+                    }
+                }
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-vxquery/blob/99a4555a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java
new file mode 100644
index 0000000..91657c6
--- /dev/null
+++ b/vxquery-core/src/main/java/org/apache/vxquery/runtime/functions/aggregate/AvgLocalAggregateEvaluatorFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.vxquery.runtime.functions.aggregate;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.vxquery.datamodel.accessors.TaggedValuePointable;
+import org.apache.vxquery.datamodel.builders.sequence.SequenceBuilder;
+import org.apache.vxquery.datamodel.values.ValueTag;
+import org.apache.vxquery.datamodel.values.XDMConstants;
+import org.apache.vxquery.exceptions.ErrorCode;
+import org.apache.vxquery.exceptions.SystemException;
+import org.apache.vxquery.runtime.functions.arithmetic.AddOperation;
+import org.apache.vxquery.runtime.functions.arithmetic.DivideOperation;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluator;
+import org.apache.vxquery.runtime.functions.base.AbstractTaggedValueArgumentAggregateEvaluatorFactory;
+import org.apache.vxquery.runtime.functions.util.FunctionHelper;
+
+import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IAggregateEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import edu.uci.ics.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import edu.uci.ics.hyracks.data.std.api.IPointable;
+import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
+
+public class AvgLocalAggregateEvaluatorFactory extends AbstractTaggedValueArgumentAggregateEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+
+    public AvgLocalAggregateEvaluatorFactory(IScalarEvaluatorFactory[] args) {
+        super(args);
+    }
+
+    @Override
+    protected IAggregateEvaluator createEvaluator(IScalarEvaluator[] args) throws AlgebricksException {
+        final TaggedValuePointable tvpCount = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+        final ArrayBackedValueStorage abvs = new ArrayBackedValueStorage();
+        final ArrayBackedValueStorage abvsSeq = new ArrayBackedValueStorage();
+        final SequenceBuilder sb = new SequenceBuilder();
+        final DataOutput dOut = abvs.getDataOutput();
+        final AddOperation aOp = new AddOperation();
+
+        return new AbstractTaggedValueArgumentAggregateEvaluator(args) {
+            long count;
+            TaggedValuePointable tvpSum = (TaggedValuePointable) TaggedValuePointable.FACTORY.createPointable();
+
+            @Override
+            public void init() throws AlgebricksException {
+                count = 0;
+            }
+
+            @Override
+            public void finish(IPointable result) throws AlgebricksException {
+                if (count == 0) {
+                    XDMConstants.setEmptySequence(result);
+                } else {
+                    // Set count as a TaggedValuePointable.
+                    try {
+                        abvs.reset();
+                        dOut.write(ValueTag.XS_INTEGER_TAG);
+                        dOut.writeLong(count);
+                        tvpCount.set(abvs);
+
+                        // Save intermediate result.
+                        abvsSeq.reset();
+                        sb.reset(abvsSeq);
+                        sb.addItem(tvpCount);
+                        sb.addItem(tvpSum);
+                        sb.finish();
+                        result.set(abvsSeq);
+                    } catch (Exception e) {
+                        throw new AlgebricksException(e);
+                    }
+                }
+            }
+
+            @Override
+            protected void step(TaggedValuePointable[] args) throws SystemException {
+                TaggedValuePointable tvp = args[0];
+                if (count == 0) {
+                    // Init.
+                    tvpSum.set(tvp);
+                } else {
+                    FunctionHelper.arithmeticOperation(aOp, dCtx, tvp, tvpSum, tvpSum);
+                }
+                count++;
+            }
+        };
+    }
+}
\ No newline at end of file