You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ji...@apache.org on 2016/02/14 10:39:58 UTC
[2/2] incubator-asterixdb-hyracks git commit: Intersect the 2ndary
indexes before primary search
Intersect the 2ndary indexes before primary search
The following commits from your working branch will be included:
Change-Id: Ic16c67c529ca19d8b1a5439ddef22760945fd0d7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/577
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <bu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/commit/a63d0f74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/tree/a63d0f74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/diff/a63d0f74
Branch: refs/heads/master
Commit: a63d0f743f8430d924055f4d51a5f8d9fa4d4f59
Parents: 7a27293
Author: Jianfeng Jia <ji...@gmail.com>
Authored: Sat Feb 13 10:06:24 2016 -0800
Committer: Jianfeng Jia <ji...@gmail.com>
Committed: Sun Feb 14 00:02:19 2016 -0800
----------------------------------------------------------------------
.../core/algebra/base/LogicalOperatorTag.java | 1 +
.../core/algebra/base/PhysicalOperatorTag.java | 1 +
.../operators/logical/IntersectOperator.java | 126 +++++++
.../visitors/FDsAndEquivClassesVisitor.java | 7 +
.../visitors/IsomorphismOperatorVisitor.java | 27 ++
.../IsomorphismVariableMappingVisitor.java | 24 ++
...OperatorDeepCopyWithNewVariablesVisitor.java | 28 ++
.../visitors/LogicalPropertiesVisitor.java | 6 +
.../visitors/OperatorDeepCopyVisitor.java | 11 +
.../visitors/ProducedVariableVisitor.java | 7 +
.../logical/visitors/SchemaVariableVisitor.java | 7 +
.../visitors/SubstituteVariableVisitor.java | 19 +
.../logical/visitors/UsedVariableVisitor.java | 13 +
.../operators/physical/IntersectPOperator.java | 158 ++++++++
.../operators/physical/UnionAllPOperator.java | 12 -
.../LogicalOperatorPrettyPrintVisitor.java | 31 ++
.../BroadcastPartitioningProperty.java | 4 +
.../properties/IPartitioningProperty.java | 20 +-
.../properties/OrderedPartitionedProperty.java | 9 +
.../properties/RandomPartitioningProperty.java | 4 +
.../UnorderedPartitionedProperty.java | 9 +
.../algebra/util/OperatorManipulationUtil.java | 6 +
.../visitors/ILogicalOperatorVisitor.java | 3 +
.../core/jobgen/impl/PlanCompiler.java | 12 +-
.../algebricks-examples/piglet-example/pom.xml | 1 +
.../SetAlgebricksPhysicalOperatorsRule.java | 6 +-
.../nc/resources/memory/FrameManager.java | 7 +-
.../common/comm/io/FrameFixedFieldAppender.java | 35 +-
hyracks/hyracks-dataflow-std/pom.xml | 88 ++---
.../intersect/IntersectOperatorDescriptor.java | 363 +++++++++++++++++++
.../unit/IntersectOperatorDescriptorTest.java | 222 ++++++++++++
.../hyracks/tests/util/InputFrameGenerator.java | 69 ++++
.../tests/util/MultiThreadTaskEmulator.java | 87 +++++
.../hyracks/tests/util/OutputFrameVerifier.java | 95 +++++
hyracks/hyracks-test-support/pom.xml | 1 -
35 files changed, 1435 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 7e9da44..977107c 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -53,4 +53,5 @@ public enum LogicalOperatorTag {
UPDATE,
WRITE,
WRITE_RESULT,
+ INTERSECT,
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index c581e82..82d0b0e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -73,4 +73,5 @@ public enum PhysicalOperatorTag {
UNNEST,
UPDATE,
WRITE_RESULT,
+ INTERSECT,
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
new file mode 100644
index 0000000..e64be2b
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/IntersectOperator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.FilteredVariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.typing.NonPropagatingTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
+
+public class IntersectOperator extends AbstractLogicalOperator {
+
+ private final List<List<LogicalVariable>> inputVars;
+ private final List<LogicalVariable> outputVars;
+
+ public IntersectOperator(List<LogicalVariable> outputVars, List<List<LogicalVariable>> inputVars)
+ throws AlgebricksException {
+ if (outputVars.size() != inputVars.get(0).size()) {
+ throw new AlgebricksException("The number of output variables is different with the input variable number");
+ }
+ if (inputVars.stream().anyMatch(vlist -> vlist.size() != outputVars.size())) {
+ throw new AlgebricksException("The schemas of input variables are not consistent");
+ }
+ this.outputVars = outputVars;
+ this.inputVars = inputVars;
+ }
+
+ @Override
+ public LogicalOperatorTag getOperatorTag() {
+ return LogicalOperatorTag.INTERSECT;
+ }
+
+ @Override
+ public void recomputeSchema() throws AlgebricksException {
+ schema = outputVars;
+ }
+
+ @Override
+ public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+ throws AlgebricksException {
+ return false;
+ }
+
+ @Override
+ public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+ return visitor.visitIntersectOperator(this, arg);
+ }
+
+ @Override
+ public boolean isMap() {
+ return false;
+ }
+
+ @Override
+ public VariablePropagationPolicy getVariablePropagationPolicy() {
+ return new FilteredVariablePropagationPolicy(outputVars);
+ }
+
+ @Override
+ public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+ IVariableTypeEnvironment typeEnv = ctx.getOutputTypeEnvironment(inputs.get(0).getValue());
+
+ for (int i = 1; i < inputs.size(); i++) {
+ checkTypeConsistency(typeEnv, inputVars.get(0), ctx.getOutputTypeEnvironment(inputs.get(i).getValue()),
+ inputVars.get(i));
+ }
+
+ IVariableTypeEnvironment env = new NonPropagatingTypeEnvironment(ctx.getExpressionTypeComputer(),
+ ctx.getMetadataProvider());
+ for (int i = 0; i < outputVars.size(); i++) {
+ env.setVarType(outputVars.get(i), typeEnv.getVarType(inputVars.get(0).get(i)));
+ }
+ return typeEnv;
+ }
+
+ public List<LogicalVariable> getOutputVars() {
+ return outputVars;
+ }
+
+ public int getNumInput() {
+ return inputVars.size();
+ }
+
+ public List<LogicalVariable> getInputVariables(int inputIndex) {
+ return inputVars.get(inputIndex);
+ }
+
+ private void checkTypeConsistency(IVariableTypeEnvironment expected, List<LogicalVariable> expectedVariables,
+ IVariableTypeEnvironment actual, List<LogicalVariable> actualVariables) throws AlgebricksException {
+ for (int i = 0; i < expectedVariables.size(); i++) {
+ Object expectedType = expected.getVarType(expectedVariables.get(i));
+ Object actualType = actual.getVarType(actualVariables.get(i));
+ if (!expectedType.equals(actualType)) {
+ AlgebricksConfig.ALGEBRICKS_LOGGER
+ .warning("Type of two variables are not equal." + expectedVariables.get(i) + " is of type: "
+ + expectedType + actualVariables.get(i) + " is of type: " + actualType);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index b2c97c3..398b4d2 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -59,6 +59,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -475,6 +476,12 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
}
@Override
+ public Void visitIntersectOperator(IntersectOperator op, IOptimizationContext ctx) throws AlgebricksException {
+ setEmptyFDsEqClasses(op, ctx);
+ return null;
+ }
+
+ @Override
public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext ctx) throws AlgebricksException {
fdsEqClassesForAbstractUnnestOperator(op, ctx);
return null;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index 7a4e7e1..eb6cd15 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -48,6 +48,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -331,6 +332,32 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
}
@Override
+ public Boolean visitIntersectOperator(IntersectOperator op, ILogicalOperator arg) throws AlgebricksException {
+ if (op.getOperatorTag() != LogicalOperatorTag.INTERSECT){
+ return Boolean.FALSE;
+ }
+ IntersectOperator intersetOpArg = (IntersectOperator) copyAndSubstituteVar(op, arg);
+ List<LogicalVariable> variables = op.getOutputVars();
+ List<LogicalVariable> variablesArg = intersetOpArg.getOutputVars();
+ if (variables.size() != variablesArg.size()){
+ return Boolean.FALSE;
+ }
+ if (!VariableUtilities.varListEqualUnordered(variables, variablesArg)){
+ return Boolean.FALSE;
+ }
+
+ if (op.getNumInput() != intersetOpArg.getNumInput()){
+ return Boolean.FALSE;
+ }
+ for (int i = 0; i < op.getNumInput(); i++){
+ if (!VariableUtilities.varListEqualUnordered(op.getInputVariables(i), intersetOpArg.getInputVariables(i))){
+ return Boolean.FALSE;
+ }
+ }
+ return Boolean.TRUE;
+ }
+
+ @Override
public Boolean visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
if (aop.getOperatorTag() != LogicalOperatorTag.UNNEST)
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index c46ffde..7b0b944 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -202,6 +203,13 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
}
@Override
+ public Void visitIntersectOperator(IntersectOperator op, ILogicalOperator arg) throws AlgebricksException {
+ mapChildren(op, arg);
+ mapVariablesForIntersect(op, arg);
+ return null;
+ }
+
+ @Override
public Void visitUnnestOperator(UnnestOperator op, ILogicalOperator arg) throws AlgebricksException {
mapVariablesStandard(op, arg);
return null;
@@ -428,6 +436,22 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
}
}
+ private void mapVariablesForIntersect(IntersectOperator op, ILogicalOperator arg) {
+ IntersectOperator opArg = (IntersectOperator) arg;
+ if (op.getNumInput() != opArg.getNumInput()){
+ return;
+ }
+ for (int i = 0; i < op.getNumInput(); i++){
+ for (int j = 0; j < op.getInputVariables(i).size(); j++){
+ if (!varEquivalent(op.getInputVariables(i).get(j), opArg.getInputVariables(i).get(j))){
+ return;
+ }
+ }
+
+ }
+ mapVariables(op.getOutputVars(), opArg.getOutputVars());
+ }
+
private boolean varEquivalent(LogicalVariable left, LogicalVariable right) {
if (variableMapping.get(right) == null)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index b3f6639..9d83ae5 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOpe
import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalDataLookupOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -464,6 +465,33 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
}
@Override
+ public ILogicalOperator visitIntersectOperator(IntersectOperator op, ILogicalOperator arg)
+ throws AlgebricksException {
+ List<List<LogicalVariable>> liveVarsInInputs = getLiveVarsInInputs(op);
+ List<LogicalVariable> outputCopy = new ArrayList<>();
+ for (LogicalVariable var : op.getOutputVars()){
+ outputCopy.add(deepCopyVariable(var));
+ }
+ IntersectOperator opCopy = new IntersectOperator(outputCopy, liveVarsInInputs);
+ deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+ return opCopy;
+ }
+
+ private List<List<LogicalVariable>> getLiveVarsInInputs(AbstractLogicalOperator op) throws AlgebricksException {
+ List<Mutable<ILogicalOperator>> copiedInputs = new ArrayList<>();
+ for (Mutable<ILogicalOperator> childRef : op.getInputs()) {
+ copiedInputs.add(deepCopyOperatorReference(childRef, null));
+ }
+ List<List<LogicalVariable>> liveVarsInInputs = new ArrayList<>();
+ for (Mutable<ILogicalOperator> inputOpRef : copiedInputs) {
+ List<LogicalVariable> liveVars = new ArrayList<>();
+ VariableUtilities.getLiveVariables(inputOpRef.getValue(), liveVars);
+ liveVarsInInputs.add(liveVars);
+ }
+ return liveVarsInInputs;
+ }
+
+ @Override
public ILogicalOperator visitUnnestMapOperator(UnnestMapOperator op, ILogicalOperator arg)
throws AlgebricksException {
UnnestMapOperator opCopy = new UnnestMapOperator(deepCopyVariableList(op.getVariables()),
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 4e5b13e..9de0992 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalData
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -229,6 +230,11 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
}
@Override
+ public Void visitIntersectOperator(IntersectOperator op, IOptimizationContext arg) throws AlgebricksException {
+ return null;
+ }
+
+ @Override
public Void visitUnnestMapOperator(UnnestMapOperator op, IOptimizationContext arg) throws AlgebricksException {
// TODO Auto-generated method stub
return null;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 2e402fc..8d436d1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalData
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -206,6 +207,16 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
}
@Override
+ public ILogicalOperator visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ List<LogicalVariable> outputVar = new ArrayList<>(op.getOutputVars());
+ List<List<LogicalVariable>> inputVars = new ArrayList<>(op.getNumInput());
+ for(int i = 0; i < op.getNumInput(); i++){
+ inputVars.add(new ArrayList<>(op.getInputVariables(i)));
+ }
+ return new IntersectOperator(outputVar, inputVars);
+ }
+
+ @Override
public ILogicalOperator visitUnnestOperator(UnnestOperator op, Void arg) throws AlgebricksException {
return new UnnestOperator(op.getVariable(), deepCopyExpressionRef(op.getExpressionRef()),
op.getPositionalVariable(), op.getPositionalVariableType(), op.getPositionWriter());
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 8df772b..d23ff94 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -204,6 +205,12 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
}
@Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ producedVariables.addAll(op.getOutputVars());
+ return null;
+ }
+
+ @Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
producedVariables.addAll(op.getVariables());
return null;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index b488df1..5c6d3c3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -44,6 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalData
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -230,6 +231,12 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
}
@Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ VariableUtilities.getProducedVariables(op, schemaVariables);
+ return null;
+ }
+
+ @Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws AlgebricksException {
if (op.propagatesInput()) {
standardLayout(op);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 91ff073..d8e25f7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -42,6 +42,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExternalData
import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
@@ -303,6 +304,24 @@ public class SubstituteVariableVisitor
}
@Override
+ public Void visitIntersectOperator(IntersectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+ throws AlgebricksException {
+ for (int i = 0; i < op.getOutputVars().size(); i++) {
+ if (op.getOutputVars().get(i).equals(pair.first)){
+ op.getOutputVars().set(i, pair.second);
+ }
+ }
+ for(int i = 0; i < op.getNumInput(); i++){
+ for (int j = 0; j < op.getInputVariables(i).size(); j++){
+ if (op.getInputVariables(i).get(j).equals(pair.first)){
+ op.getInputVariables(i).set(j, pair.second);
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Pair<LogicalVariable, LogicalVariable> pair)
throws AlgebricksException {
List<LogicalVariable> variables = op.getVariables();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index cfc57c2..ef02feb 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -297,6 +298,18 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
}
@Override
+ public Void visitIntersectOperator(IntersectOperator op, Void arg) throws AlgebricksException {
+ for (int i = 0; i < op.getNumInput(); i++) {
+ for (LogicalVariable var : op.getInputVariables(i)) {
+ if (!usedVariables.contains(var)) {
+ usedVariables.add(var);
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) {
op.getExpressionRef().getValue().getUsedVariables(usedVariables);
if (op.getAdditionalFilteringExpressions() != null) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
new file mode 100644
index 0000000..b6d0f1f
--- /dev/null
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/IntersectPOperator.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPartitioningRequirementsCoordinator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.LocalOrderProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.OrderColumn;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.intersect.IntersectOperatorDescriptor;
+
+public class IntersectPOperator extends AbstractPhysicalOperator {
+
+ @Override
+ public PhysicalOperatorTag getOperatorTag() {
+ return PhysicalOperatorTag.INTERSECT;
+ }
+
+ @Override
+ public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator iop,
+ IPhysicalPropertiesVector reqdByParent) {
+ IntersectOperator intersectOp = (IntersectOperator) iop;
+ StructuralPropertiesVector[] pv = new StructuralPropertiesVector[intersectOp.getNumInput()];
+ for (int i = 0; i < intersectOp.getNumInput(); i++) {
+ List<ILocalStructuralProperty> localProps = new ArrayList<>();
+ List<OrderColumn> orderColumns = new ArrayList<>();
+ for (LogicalVariable column : intersectOp.getInputVariables(i)) {
+ orderColumns.add(new OrderColumn(column, OrderOperator.IOrder.OrderKind.ASC));
+ }
+ localProps.add(new LocalOrderProperty(orderColumns));
+ IPartitioningProperty pp = null;
+ if (intersectOp.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
+ pp = new RandomPartitioningProperty(null);
+ }
+ pv[i] = new StructuralPropertiesVector(pp, localProps);
+ }
+ return new PhysicalRequirements(pv, IPartitioningRequirementsCoordinator.NO_COORDINATION);
+ }
+
+ @Override
+ public void computeDeliveredProperties(ILogicalOperator iop, IOptimizationContext context)
+ throws AlgebricksException {
+ IntersectOperator op = (IntersectOperator) iop;
+ IPartitioningProperty pp = op.getInputs().get(0).getValue().getDeliveredPhysicalProperties()
+ .getPartitioningProperty();
+
+ HashMap<LogicalVariable, LogicalVariable> varMaps = new HashMap<>(op.getOutputVars().size());
+ for (int i = 0; i < op.getOutputVars().size(); i++) {
+ varMaps.put(op.getInputVariables(0).get(i), op.getOutputVars().get(i));
+ }
+ pp.substituteColumnVars(varMaps);
+
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
+ List<OrderColumn> orderColumns = new ArrayList<>();
+ for (LogicalVariable var : op.getOutputVars()) {
+ orderColumns.add(new OrderColumn(var, OrderOperator.IOrder.OrderKind.ASC));
+ }
+ propsLocal.add(new LocalOrderProperty(orderColumns));
+ deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
+ }
+
+ @Override
+ public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+ IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+ throws AlgebricksException {
+ // logical op should have checked all the mismatch issues.
+ IntersectOperator logicalOp = (IntersectOperator) op;
+ int nInput = logicalOp.getNumInput();
+ int[][] compareFields = new int[nInput][];
+
+ IBinaryComparatorFactory[] comparatorFactories = JobGenHelper.variablesToAscBinaryComparatorFactories(
+ logicalOp.getInputVariables(0), context.getTypeEnvironment(op), context);
+
+ INormalizedKeyComputerFactoryProvider nkcfProvider = context.getNormalizedKeyComputerFactoryProvider();
+ INormalizedKeyComputerFactory nkcf = null;
+
+ if (nkcfProvider != null) {
+ Object type = context.getTypeEnvironment(op).getVarType(logicalOp.getInputVariables(0).get(0));
+ if (type != null) {
+ nkcf = nkcfProvider.getNormalizedKeyComputerFactory(type, true);
+ }
+ }
+
+ for (int i = 0; i < logicalOp.getNumInput(); i++) {
+ compareFields[i] = JobGenHelper.variablesToFieldIndexes(logicalOp.getInputVariables(i), inputSchemas[i]);
+ }
+
+ IOperatorDescriptorRegistry spec = builder.getJobSpec();
+ RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+ context);
+
+ IntersectOperatorDescriptor opDescriptor = null;
+ try {
+ opDescriptor = new IntersectOperatorDescriptor(spec, nInput, compareFields, nkcf, comparatorFactories,
+ recordDescriptor);
+ } catch (HyracksException e) {
+ throw new AlgebricksException(e);
+ }
+
+ contributeOpDesc(builder, (AbstractLogicalOperator) op, opDescriptor);
+ for (int i = 0; i < op.getInputs().size(); i++) {
+ builder.contributeGraphEdge(op.getInputs().get(i).getValue(), 0, op, i);
+ }
+ }
+
+ @Override
+ public boolean isMicroOperator() {
+ return false;
+ }
+
+ @Override
+ public boolean expensiveThanMaterialization() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
index 4d51bf0..2c407b7 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/UnionAllPOperator.java
@@ -75,18 +75,6 @@ public class UnionAllPOperator extends AbstractPhysicalOperator {
public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
throws AlgebricksException {
- UnionAllOperator unionOp = (UnionAllOperator) op;
- int n = unionOp.getVariableMappings().size();
- int[] leftColumns = new int[n];
- int[] rightColumns = new int[n];
- int i = 0;
- for (Triple<LogicalVariable, LogicalVariable, LogicalVariable> t : unionOp.getVariableMappings()) {
- int posLeft = inputSchemas[0].findVariable(t.first);
- leftColumns[i] = posLeft;
- int posRight = inputSchemas[1].findVariable(t.second);
- rightColumns[i] = posRight;
- ++i;
- }
IOperatorDescriptorRegistry spec = builder.getJobSpec();
RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema, context);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index e310ff9..0514fbd 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -42,6 +42,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertD
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -253,6 +254,36 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
}
@Override
+ public String visitIntersectOperator(IntersectOperator op, Integer indent) throws AlgebricksException {
+ StringBuilder builder = new StringBuilder();
+ addIndent(builder, indent).append("intersect (");
+
+ builder.append('[');
+ for (int i = 0; i < op.getOutputVars().size(); i++) {
+ if (i > 0) {
+ builder.append(", ");
+ }
+ builder.append(op.getOutputVars().get(i));
+ }
+ builder.append("] <- [");
+ for (int i = 0; i < op.getNumInput(); i++) {
+ if (i > 0) {
+ builder.append(", ");
+ }
+ builder.append('[');
+ for (int j = 0; j < op.getInputVariables(i).size(); j++) {
+ if (j > 0) {
+ builder.append(", ");
+ }
+ builder.append(op.getInputVariables(i).get(j));
+ }
+ builder.append(']');
+ }
+ builder.append("])");
+ return builder.toString();
+ }
+
+ @Override
public String visitUnnestOperator(UnnestOperator op, Integer indent) throws AlgebricksException {
StringBuilder buffer = new StringBuilder();
addIndent(buffer, indent).append("unnest " + op.getVariable());
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
index 5926a20..f3ba030 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/BroadcastPartitioningProperty.java
@@ -57,4 +57,8 @@ public class BroadcastPartitioningProperty implements IPartitioningProperty {
this.domain = domain;
}
+ @Override
+ public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
index 3142d10..89ac374 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/IPartitioningProperty.java
@@ -26,11 +26,11 @@ import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
public interface IPartitioningProperty extends IStructuralProperty {
- public enum PartitioningType {
+ enum PartitioningType {
UNPARTITIONED, RANDOM, BROADCAST, UNORDERED_PARTITIONED, ORDERED_PARTITIONED
}
- static final INodeDomain DOMAIN_FOR_UNPARTITIONED_DATA = new INodeDomain() {
+ INodeDomain DOMAIN_FOR_UNPARTITIONED_DATA = new INodeDomain() {
@Override
public boolean sameAs(INodeDomain domain) {
return domain == this;
@@ -42,7 +42,7 @@ public interface IPartitioningProperty extends IStructuralProperty {
}
};
- public static final IPartitioningProperty UNPARTITIONED = new IPartitioningProperty() {
+ IPartitioningProperty UNPARTITIONED = new IPartitioningProperty() {
@Override
public PartitioningType getPartitioningType() {
@@ -72,14 +72,20 @@ public interface IPartitioningProperty extends IStructuralProperty {
public void setNodeDomain(INodeDomain domain) {
throw new IllegalStateException();
}
+
+ @Override
+ public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> variableMap) {
+ }
};
- public abstract PartitioningType getPartitioningType();
+ PartitioningType getPartitioningType();
- public abstract void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
+ void normalize(Map<LogicalVariable, EquivalenceClass> equivalenceClasses,
List<FunctionalDependency> fds);
- public abstract INodeDomain getNodeDomain();
+ INodeDomain getNodeDomain();
+
+ void setNodeDomain(INodeDomain domain);
- public abstract void setNodeDomain(INodeDomain domain);
+ void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index f28bc56..5808da1 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -81,4 +81,13 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
this.domain = domain;
}
+ @Override
+ public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+ for (OrderColumn orderColumn : orderColumns){
+ if (varMap.containsKey(orderColumn.getColumn())){
+ orderColumn.setColumn(varMap.get(orderColumn.getColumn()));
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
index 24fe8e7..917fdd8 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/RandomPartitioningProperty.java
@@ -62,4 +62,8 @@ public class RandomPartitioningProperty implements IPartitioningProperty {
this.domain = domain;
}
+ @Override
+ public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
index de3b102..17e0cb3 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/UnorderedPartitionedProperty.java
@@ -65,4 +65,13 @@ public final class UnorderedPartitionedProperty extends AbstractGroupingProperty
this.domain = domain;
}
+ @Override
+ public void substituteColumnVars(Map<LogicalVariable, LogicalVariable> varMap) {
+ for (Map.Entry<LogicalVariable, LogicalVariable> var : varMap.entrySet()){
+ if (columnSet.remove(var.getKey())){
+ columnSet.add(var.getValue());
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
index b5099b1..0aa6676 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/util/OperatorManipulationUtil.java
@@ -226,6 +226,12 @@ public class OperatorManipulationUtil {
return op.accept(visitor, null);
}
+ public static ILogicalOperator deepCopyWithExcutionMode(ILogicalOperator op) throws AlgebricksException {
+ OperatorDeepCopyVisitor visitor = new OperatorDeepCopyVisitor();
+ AbstractLogicalOperator newOp = (AbstractLogicalOperator) op.accept(visitor, null);
+ newOp.setExecutionMode(op.getExecutionMode());
+ return newOp;
+ }
/**
* Compute type environment of a newly generated operator {@code op} and its input.
*
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index 53c8b69..6509e2a 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -32,6 +32,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
@@ -95,6 +96,8 @@ public interface ILogicalOperatorVisitor<R, T> {
public R visitUnionOperator(UnionAllOperator op, T arg) throws AlgebricksException;
+ public R visitIntersectOperator(IntersectOperator op, T arg) throws AlgebricksException;
+
public R visitUnnestOperator(UnnestOperator op, T arg) throws AlgebricksException;
public R visitOuterUnnestOperator(OuterUnnestOperator op, T arg) throws AlgebricksException;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
index 186ac6f..1a61f2e 100644
--- a/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
+++ b/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/PlanCompiler.java
@@ -76,18 +76,18 @@ public class PlanCompiler {
int n = op.getInputs().size();
IOperatorSchema[] schemas = new IOperatorSchema[n];
int i = 0;
- for (Mutable<ILogicalOperator> opRef2 : op.getInputs()) {
- List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opRef2);
+ for (Mutable<ILogicalOperator> opChild : op.getInputs()) {
+ List<Mutable<ILogicalOperator>> parents = operatorVisitedToParents.get(opChild);
if (parents == null) {
parents = new ArrayList<Mutable<ILogicalOperator>>();
- operatorVisitedToParents.put(opRef2, parents);
+ operatorVisitedToParents.put(opChild, parents);
parents.add(opRef);
- compileOpRef(opRef2, spec, builder, outerPlanSchema);
- schemas[i++] = context.getSchema(opRef2.getValue());
+ compileOpRef(opChild, spec, builder, outerPlanSchema);
+ schemas[i++] = context.getSchema(opChild.getValue());
} else {
if (!parents.contains(opRef))
parents.add(opRef);
- schemas[i++] = context.getSchema(opRef2.getValue());
+ schemas[i++] = context.getSchema(opChild.getValue());
continue;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-examples/piglet-example/pom.xml
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-examples/piglet-example/pom.xml b/algebricks/algebricks-examples/piglet-example/pom.xml
index ae2ec51..20bc586 100644
--- a/algebricks/algebricks-examples/piglet-example/pom.xml
+++ b/algebricks/algebricks-examples/piglet-example/pom.xml
@@ -48,6 +48,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.10</version>
<executions>
<execution>
<id>add-source</id>
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
----------------------------------------------------------------------
diff --git a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index e99d126..73bba8f 100644
--- a/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -64,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.InMemorySta
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexBulkloadPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.IndexInsertDeleteUpsertPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.InsertDeleteUpsertPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.IntersectPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MicroPreclusteredGroupByPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.NestedTupleSourcePOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDistinctByPOperator;
@@ -249,7 +250,10 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
op.setPhysicalOperator(new UnionAllPOperator());
break;
}
-
+ case INTERSECT: {
+ op.setPhysicalOperator(new IntersectPOperator());
+ break;
+ }
case UNNEST: {
op.setPhysicalOperator(new UnnestPOperator());
break;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java
index 3d3ed8d..8df1f38 100644
--- a/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java
+++ b/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/resources/memory/FrameManager.java
@@ -52,7 +52,8 @@ public class FrameManager implements IHyracksFrameMgrContext {
ByteBuffer buffer = ByteBuffer.allocate(bytes);
if (bytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) {
throw new HyracksDataException(
- "Unable to allocate frame larger than:" + FrameConstants.MAX_NUM_MINFRAME + " bytes");
+ "Unable to allocate frame larger than:" + FrameConstants.MAX_NUM_MINFRAME * minFrameSize
+ + " bytes");
}
FrameHelper.serializeFrameSize(buffer, (byte) (bytes / minFrameSize));
return (ByteBuffer) buffer.clear();
@@ -74,8 +75,8 @@ public class FrameManager implements IHyracksFrameMgrContext {
buffer.position(pos);
if (newSizeInBytes / minFrameSize > FrameConstants.MAX_NUM_MINFRAME) {
- throw new HyracksDataException("Unable to allocate frame of size bigger than MinFrameSize * "
- + FrameConstants.MAX_NUM_MINFRAME);
+ throw new HyracksDataException("Unable to allocate frame of size bigger than: "
+ + FrameConstants.MAX_NUM_MINFRAME * minFrameSize + " bytes");
}
FrameHelper.serializeFrameSize(buffer, (byte) (newSizeInBytes / minFrameSize));
return buffer;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
index 21a7a71..e62e9e7 100644
--- a/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
+++ b/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameFixedFieldAppender.java
@@ -49,18 +49,22 @@ public class FrameFixedFieldAppender extends AbstractFrameAppender implements IF
leftOverSize = 0;
}
+ /**
+ * Reset frame states and copy the left over data into the new frame
+ *
+ * @param frame
+ * @throws HyracksDataException
+ */
+ public void resetWithLeftOverData(IFrame frame) throws HyracksDataException {
+ super.reset(frame, true);
+ copyLeftOverDataFromeBufferToFrame();
+ }
+
@Override
public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
super.write(outWriter, clearFrame);
if (clearFrame) {
- if (leftOverSize > 0) {
- if (!canHoldNewTuple(0, leftOverSize)) {
- throw new HyracksDataException(
- "The given frame can not be extended to insert the leftover data from the last record");
- }
- System.arraycopy(cachedLeftOverFields, 0, array, tupleDataEndOffset, leftOverSize);
- leftOverSize = 0;
- }
+ copyLeftOverDataFromeBufferToFrame();
}
}
@@ -85,13 +89,13 @@ public class FrameFixedFieldAppender extends AbstractFrameAppender implements IF
return true;
} else {
if (currentField > 0) {
- copyLeftOverData();
+ copyLeftOverDataFromFrameToBuffer();
}
return false;
}
}
- private void copyLeftOverData() {
+ private void copyLeftOverDataFromFrameToBuffer() {
leftOverSize = lastFieldEndOffset + fieldCount * 4;
if (cachedLeftOverFields == null || cachedLeftOverFields.length < leftOverSize) {
cachedLeftOverFields = new byte[leftOverSize];
@@ -99,6 +103,17 @@ public class FrameFixedFieldAppender extends AbstractFrameAppender implements IF
System.arraycopy(array, tupleDataEndOffset, cachedLeftOverFields, 0, leftOverSize);
}
+ private void copyLeftOverDataFromeBufferToFrame() throws HyracksDataException {
+ if (leftOverSize > 0) {
+ if (!canHoldNewTuple(0, leftOverSize)) {
+ throw new HyracksDataException(
+ "The given frame can not be extended to insert the leftover data from the last record");
+ }
+ System.arraycopy(cachedLeftOverFields, 0, array, tupleDataEndOffset, leftOverSize);
+ leftOverSize = 0;
+ }
+ }
+
public boolean appendField(IFrameTupleAccessor fta, int tIndex, int fIndex) throws HyracksDataException {
int startOffset = fta.getTupleStartOffset(tIndex);
int fStartOffset = fta.getFieldStartOffset(tIndex, fIndex);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb-hyracks/blob/a63d0f74/hyracks/hyracks-dataflow-std/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks/hyracks-dataflow-std/pom.xml b/hyracks/hyracks-dataflow-std/pom.xml
index 949ea38..b01789c 100644
--- a/hyracks/hyracks-dataflow-std/pom.xml
+++ b/hyracks/hyracks-dataflow-std/pom.xml
@@ -17,52 +17,52 @@
! under the License.
!-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <artifactId>hyracks-dataflow-std</artifactId>
- <name>hyracks-dataflow-std</name>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>hyracks-dataflow-std</artifactId>
+ <name>hyracks-dataflow-std</name>
- <parent>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- </parent>
+ <parent>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ </parent>
- <licenses>
- <license>
- <name>Apache License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- <distribution>repo</distribution>
- <comments>A business-friendly OSS license</comments>
- </license>
- </licenses>
+ <licenses>
+ <license>
+ <name>Apache License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ <distribution>repo</distribution>
+ <comments>A business-friendly OSS license</comments>
+ </license>
+ </licenses>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-api</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-dataflow-common</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <type>jar</type>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-control-nc</artifactId>
- <version>0.2.17-SNAPSHOT</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-api</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-dataflow-common</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <type>jar</type>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hyracks</groupId>
+ <artifactId>hyracks-control-nc</artifactId>
+ <version>0.2.17-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>