You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:08 UTC
[29/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
deleted file mode 100644
index c2e81a8..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/InterestingPropertyVisitor.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.costs.CostEstimator;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-/**
- * Visitor that computes the interesting properties for each node in the optimizer DAG. On its recursive
- * depth-first descend, it propagates all interesting properties top-down.
- */
-public class InterestingPropertyVisitor implements Visitor<OptimizerNode> {
-
- private CostEstimator estimator; // the cost estimator for maximal costs of an interesting property
-
- /**
- * Creates a new visitor that computes the interesting properties for all nodes in the plan.
- * It uses the given cost estimator used to compute the maximal costs for an interesting property.
- *
- * @param estimator
- * The cost estimator to estimate the maximal costs for interesting properties.
- */
- public InterestingPropertyVisitor(CostEstimator estimator) {
- this.estimator = estimator;
- }
-
- @Override
- public boolean preVisit(OptimizerNode node) {
- // The interesting properties must be computed on the descend. In case a node has multiple outputs,
- // that computation must happen during the last descend.
-
- if (node.getInterestingProperties() == null && node.haveAllOutputConnectionInterestingProperties()) {
- node.computeUnionOfInterestingPropertiesFromSuccessors();
- node.computeInterestingPropertiesForInputs(this.estimator);
- return true;
- } else {
- return false;
- }
- }
-
- @Override
- public void postVisit(OptimizerNode visitable) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
deleted file mode 100644
index 58aa3c1..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/PlanFinalizer.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.IterationPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.optimizer.plan.WorksetPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/**
- * This visitor traverses the selected execution plan and finalizes it:
- *
- * <ul>
- * <li>The graph of nodes is double-linked (links from child to parent are inserted).</li>
- * <li>If unions join static and dynamic paths, the cache is marked as a memory consumer.</li>
- * <li>Relative memory fractions are assigned to all nodes.</li>
- * <li>All nodes are collected into a set.</li>
- * </ul>
- */
-public class PlanFinalizer implements Visitor<PlanNode> {
-
- private final Set<PlanNode> allNodes; // a set of all nodes in the optimizer plan
-
- private final List<SourcePlanNode> sources; // all data source nodes in the optimizer plan
-
- private final List<SinkPlanNode> sinks; // all data sink nodes in the optimizer plan
-
- private final Deque<IterationPlanNode> stackOfIterationNodes;
-
- private int memoryConsumerWeights; // a counter of all memory consumers
-
- /**
- * Creates a new plan finalizer.
- */
- public PlanFinalizer() {
- this.allNodes = new HashSet<PlanNode>();
- this.sources = new ArrayList<SourcePlanNode>();
- this.sinks = new ArrayList<SinkPlanNode>();
- this.stackOfIterationNodes = new ArrayDeque<IterationPlanNode>();
- }
-
- public OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName, Plan originalPlan) {
- this.memoryConsumerWeights = 0;
-
- // traverse the graph
- for (SinkPlanNode node : sinks) {
- node.accept(this);
- }
-
- // assign the memory to each node
- if (this.memoryConsumerWeights > 0) {
- for (PlanNode node : this.allNodes) {
- // assign memory to the driver strategy of the node
- final int consumerWeight = node.getMemoryConsumerWeight();
- if (consumerWeight > 0) {
- final double relativeMem = (double)consumerWeight / this.memoryConsumerWeights;
- node.setRelativeMemoryPerSubtask(relativeMem);
- if (Optimizer.LOG.isDebugEnabled()) {
- Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each subtask of " +
- node.getProgramOperator().getName() + ".");
- }
- }
-
- // assign memory to the local and global strategies of the channels
- for (Channel c : node.getInputs()) {
- if (c.getLocalStrategy().dams()) {
- final double relativeMem = 1.0 / this.memoryConsumerWeights;
- c.setRelativeMemoryLocalStrategy(relativeMem);
- if (Optimizer.LOG.isDebugEnabled()) {
- Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each local strategy " +
- "instance of " + c + ".");
- }
- }
- if (c.getTempMode() != TempMode.NONE) {
- final double relativeMem = 1.0/ this.memoryConsumerWeights;
- c.setRelativeTempMemory(relativeMem);
- if (Optimizer.LOG.isDebugEnabled()) {
- Optimizer.LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
- "table for " + c + ".");
- }
- }
- }
- }
- }
- return new OptimizedPlan(this.sources, this.sinks, this.allNodes, jobName, originalPlan);
- }
-
- @Override
- public boolean preVisit(PlanNode visitable) {
- // if we come here again, prevent a further descend
- if (!this.allNodes.add(visitable)) {
- return false;
- }
-
- if (visitable instanceof SinkPlanNode) {
- this.sinks.add((SinkPlanNode) visitable);
- }
- else if (visitable instanceof SourcePlanNode) {
- this.sources.add((SourcePlanNode) visitable);
- }
- else if (visitable instanceof BinaryUnionPlanNode) {
- BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
- if (unionNode.unionsStaticAndDynamicPath()) {
- unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
- }
- }
- else if (visitable instanceof BulkPartialSolutionPlanNode) {
- // tell the partial solution about the iteration node that contains it
- final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
- final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
- // sanity check!
- if (iteration == null || !(iteration instanceof BulkIterationPlanNode)) {
- throw new CompilerException("Bug: Error finalizing the plan. " +
- "Cannot associate the node for a partial solutions with its containing iteration.");
- }
- pspn.setContainingIterationNode((BulkIterationPlanNode) iteration);
- }
- else if (visitable instanceof WorksetPlanNode) {
- // tell the partial solution about the iteration node that contains it
- final WorksetPlanNode wspn = (WorksetPlanNode) visitable;
- final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
- // sanity check!
- if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
- throw new CompilerException("Bug: Error finalizing the plan. " +
- "Cannot associate the node for a partial solutions with its containing iteration.");
- }
- wspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
- }
- else if (visitable instanceof SolutionSetPlanNode) {
- // tell the partial solution about the iteration node that contains it
- final SolutionSetPlanNode sspn = (SolutionSetPlanNode) visitable;
- final IterationPlanNode iteration = this.stackOfIterationNodes.peekLast();
-
- // sanity check!
- if (iteration == null || !(iteration instanceof WorksetIterationPlanNode)) {
- throw new CompilerException("Bug: Error finalizing the plan. " +
- "Cannot associate the node for a partial solutions with its containing iteration.");
- }
- sspn.setContainingIterationNode((WorksetIterationPlanNode) iteration);
- }
-
- // double-connect the connections. previously, only parents knew their children, because
- // one child candidate could have been referenced by multiple parents.
- for (Channel conn : visitable.getInputs()) {
- conn.setTarget(visitable);
- conn.getSource().addOutgoingChannel(conn);
- }
-
- for (Channel c : visitable.getBroadcastInputs()) {
- c.setTarget(visitable);
- c.getSource().addOutgoingChannel(c);
- }
-
- // count the memory consumption
- this.memoryConsumerWeights += visitable.getMemoryConsumerWeight();
- for (Channel c : visitable.getInputs()) {
- if (c.getLocalStrategy().dams()) {
- this.memoryConsumerWeights++;
- }
- if (c.getTempMode() != TempMode.NONE) {
- this.memoryConsumerWeights++;
- }
- }
- for (Channel c : visitable.getBroadcastInputs()) {
- if (c.getLocalStrategy().dams()) {
- this.memoryConsumerWeights++;
- }
- if (c.getTempMode() != TempMode.NONE) {
- this.memoryConsumerWeights++;
- }
- }
-
- // pass the visitor to the iteraton's step function
- if (visitable instanceof IterationPlanNode) {
- // push the iteration node onto the stack
- final IterationPlanNode iterNode = (IterationPlanNode) visitable;
- this.stackOfIterationNodes.addLast(iterNode);
-
- // recurse
- ((IterationPlanNode) visitable).acceptForStepFunction(this);
-
- // pop the iteration node from the stack
- this.stackOfIterationNodes.removeLast();
- }
- return true;
- }
-
- @Override
- public void postVisit(PlanNode visitable) {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
deleted file mode 100644
index c0dc4dd..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StaticDynamicPathIdentifier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.util.Visitor;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A traversal that goes over the program data flow of an iteration and makes the nodes
- * that depend on the partial solution (the data set recomputed in each iteration) as "dynamic"
- * and the other nodes as "static".
- */
-public class StaticDynamicPathIdentifier implements Visitor<OptimizerNode> {
-
- private final Set<OptimizerNode> seenBefore = new HashSet<OptimizerNode>();
-
- private final int costWeight;
-
- public StaticDynamicPathIdentifier(int costWeight) {
- this.costWeight = costWeight;
- }
-
- @Override
- public boolean preVisit(OptimizerNode visitable) {
- return this.seenBefore.add(visitable);
- }
-
- @Override
- public void postVisit(OptimizerNode visitable) {
- visitable.identifyDynamicPath(this.costWeight);
-
- // check that there is no nested iteration on the dynamic path
- if (visitable.isOnDynamicPath() && visitable instanceof IterationNode) {
- throw new CompilerException("Nested iterations are currently not supported.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
deleted file mode 100644
index d359490..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/StepFunctionValidator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.traversals;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.base.DeltaIterationBase;
-import org.apache.flink.util.Visitor;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A traversal that checks if the Workset of a delta iteration is used in the data flow
- * of its step function.
- */
-public class StepFunctionValidator implements Visitor<Operator<?>> {
-
- private final Set<Operator<?>> seenBefore = new HashSet<Operator<?>>();
-
- private boolean foundWorkset;
-
- @Override
- public boolean preVisit(Operator<?> visitable) {
- if (visitable instanceof DeltaIterationBase.WorksetPlaceHolder) {
- foundWorkset = true;
- }
-
- return (!foundWorkset) && seenBefore.add(visitable);
- }
-
- @Override
- public void postVisit(Operator<?> visitable) {}
-
- public boolean hasFoundWorkset() {
- return foundWorkset;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
deleted file mode 100644
index cd8766c..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/traversals/package-info.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains the various traversals over the program plan and the
- * optimizer DAG (directed acyclic graph) that are made in the course of
- * the optimization.
- *
- * The traversals are mostly implemented as a {@link org.apache.flink.util.Visitor} that
- * traversed the program flow.
- */
-package org.apache.flink.optimizer.traversals;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
deleted file mode 100644
index 5110849..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpBinaryUdfOp.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.NoOpFunction;
-import org.apache.flink.api.common.operators.BinaryOperatorInformation;
-import org.apache.flink.api.common.operators.DualInputOperator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Key;
-
-
-public class NoOpBinaryUdfOp<OUT> extends DualInputOperator<OUT, OUT, OUT, NoOpFunction> implements RecordOperator {
-
- public NoOpBinaryUdfOp(TypeInformation<OUT> type) {
- super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), new BinaryOperatorInformation<OUT, OUT, OUT>(type, type, type), "NoContract");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Class<? extends Key<?>>[] getKeyClasses() {
- return (Class<? extends Key<?>>[]) new Class[0];
- }
-
- @Override
- protected List<OUT> executeOnCollections(List<OUT> inputData1, List<OUT> inputData2, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
- throw new UnsupportedOperationException();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
deleted file mode 100644
index cc4a4d6..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/NoOpUnaryUdfOp.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.util;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.functions.util.NoOpFunction;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.types.Key;
-
-
-public class NoOpUnaryUdfOp<OUT> extends SingleInputOperator<OUT, OUT, NoOpFunction> implements RecordOperator {
-
- @SuppressWarnings("rawtypes")
- public static final NoOpUnaryUdfOp INSTANCE = new NoOpUnaryUdfOp();
-
- private NoOpUnaryUdfOp() {
- // pass null here because we override getOutputType to return type
- // of input operator
- super(new UserCodeClassWrapper<NoOpFunction>(NoOpFunction.class), null, "");
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Class<? extends Key<?>>[] getKeyClasses() {
- return (Class<? extends Key<?>>[]) new Class[0];
- }
-
- @Override
- public UnaryOperatorInformation<OUT, OUT> getOperatorInfo() {
- TypeInformation<OUT> previousOut = input.getOperatorInfo().getOutputType();
- return new UnaryOperatorInformation<OUT, OUT>(previousOut, previousOut);
- }
-
- @Override
- protected List<OUT> executeOnCollections(List<OUT> inputData, RuntimeContext runtimeContext, ExecutionConfig executionConfig) {
- return inputData;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
deleted file mode 100644
index d8f33a2..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/util/Utils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.util;
-
-import java.util.Arrays;
-
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.CompilerException;
-
-
-/**
- *
- */
-public class Utils
-{
- public static final FieldList createOrderedFromSet(FieldSet set) {
- if (set instanceof FieldList) {
- return (FieldList) set;
- } else {
- final int[] cols = set.toArray();
- Arrays.sort(cols);
- return new FieldList(cols);
- }
- }
-
- public static final Ordering createOrdering(FieldList fields, boolean[] directions) {
- final Ordering o = new Ordering();
- for (int i = 0; i < fields.size(); i++) {
- o.appendOrdering(fields.get(i), null, directions == null || directions[i] ? Order.ASCENDING : Order.DESCENDING);
- }
- return o;
- }
-
- public static final Ordering createOrdering(FieldList fields) {
- final Ordering o = new Ordering();
- for (int i = 0; i < fields.size(); i++) {
- o.appendOrdering(fields.get(i), null, Order.ANY);
- }
- return o;
- }
-
- public static boolean[] getDirections(Ordering o, int numFields) {
- final boolean[] dirs = o.getFieldSortDirections();
- if (dirs.length == numFields) {
- return dirs;
- } else if (dirs.length > numFields) {
- final boolean[] subSet = new boolean[numFields];
- System.arraycopy(dirs, 0, subSet, 0, numFields);
- return subSet;
- } else {
- throw new CompilerException();
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * No instantiation.
- */
- private Utils() {}
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
deleted file mode 100644
index 1e4bafb..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/AdditionalOperatorsTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.CrossWithLargeOperator;
-import org.apache.flink.api.java.record.operators.CrossWithSmallOperator;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
-/**
-* Tests that validate optimizer choices when using operators that are requesting certain specific execution
-* strategies.
-*/
-@SuppressWarnings({"serial", "deprecation"})
-public class AdditionalOperatorsTest extends CompilerTestBase {
-
- @Test
- public void testCrossWithSmall() {
- // construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-
- CrossOperator cross = CrossWithSmallOperator.builder(new DummyCrossStub())
- .input1(source1).input2(source2)
- .name("Cross").build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-
- Plan plan = new Plan(sink);
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
-
- try {
- OptimizedPlan oPlan = compileNoStats(plan);
- OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
-
- DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
- Channel in1 = crossPlanNode.getInput1();
- Channel in2 = crossPlanNode.getInput2();
-
- assertEquals(ShipStrategyType.FORWARD, in1.getShipStrategy());
- assertEquals(ShipStrategyType.BROADCAST, in2.getShipStrategy());
- } catch(CompilerException ce) {
- ce.printStackTrace();
- fail("The pact compiler is unable to compile this plan correctly.");
- }
- }
-
- @Test
- public void testCrossWithLarge() {
- // construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 1");
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE, "Source 2");
-
- CrossOperator cross= CrossWithLargeOperator.builder(new DummyCrossStub())
- .input1(source1).input2(source2)
- .name("Cross").build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cross, "Sink");
-
- Plan plan = new Plan(sink);
- plan.setDefaultParallelism(DEFAULT_PARALLELISM);
-
-
- try {
- OptimizedPlan oPlan = compileNoStats(plan);
- OptimizerPlanNodeResolver resolver = new OptimizerPlanNodeResolver(oPlan);
-
- DualInputPlanNode crossPlanNode = resolver.getNode("Cross");
- Channel in1 = crossPlanNode.getInput1();
- Channel in2 = crossPlanNode.getInput2();
-
- assertEquals(ShipStrategyType.BROADCAST, in1.getShipStrategy());
- assertEquals(ShipStrategyType.FORWARD, in2.getShipStrategy());
- } catch(CompilerException ce) {
- ce.printStackTrace();
- fail("The pact compiler is unable to compile this plan correctly.");
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
deleted file mode 100644
index 916aa27..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BranchingPlansCompilerTest.java
+++ /dev/null
@@ -1,1039 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.optimizer.testfunctions.DummyCoGroupFunction;
-import org.apache.flink.optimizer.testfunctions.SelectOneReducer;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.java.record.operators.BulkIteration;
-import org.apache.flink.api.java.record.operators.CoGroupOperator;
-import org.apache.flink.api.java.record.operators.CrossOperator;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.optimizer.testfunctions.DummyFlatJoinFunction;
-import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
-import org.apache.flink.optimizer.testfunctions.IdentityKeyExtractor;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-import org.apache.flink.optimizer.testfunctions.Top1GroupReducer;
-import org.apache.flink.optimizer.util.DummyCoGroupStub;
-import org.apache.flink.optimizer.util.DummyCrossStub;
-import org.apache.flink.optimizer.util.DummyInputFormat;
-import org.apache.flink.optimizer.util.DummyMatchStub;
-import org.apache.flink.optimizer.util.DummyNonPreservingMatchStub;
-import org.apache.flink.optimizer.util.DummyOutputFormat;
-import org.apache.flink.optimizer.util.IdentityMap;
-import org.apache.flink.optimizer.util.IdentityReduce;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class BranchingPlansCompilerTest extends CompilerTestBase {
-
-
- @Test
- public void testCostComputationWithMultipleDataSinks() {
- final int SINKS = 5;
-
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- DataSet<Long> source = env.generateSequence(1, 10000);
-
- DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
- DataSet<Long> mappedC = source.map(new IdentityMapper<Long>());
-
- for (int sink = 0; sink < SINKS; sink++) {
- mappedA.output(new DiscardingOutputFormat<Long>());
- mappedC.output(new DiscardingOutputFormat<Long>());
- }
-
- Plan plan = env.createProgramPlan("Plans With Multiple Data Sinks");
- OptimizedPlan oPlan = compileNoStats(plan);
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- /**
- *
- * <pre>
- * (SRC A)
- * |
- * (MAP A)
- * / \
- * (MAP B) (MAP C)
- * / / \
- * (SINK A) (SINK B) (SINK C)
- * </pre>
- */
- @SuppressWarnings("unchecked")
- @Test
- public void testBranchingWithMultipleDataSinks2() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- DataSet<Long> source = env.generateSequence(1, 10000);
-
- DataSet<Long> mappedA = source.map(new IdentityMapper<Long>());
- DataSet<Long> mappedB = mappedA.map(new IdentityMapper<Long>());
- DataSet<Long> mappedC = mappedA.map(new IdentityMapper<Long>());
-
- mappedB.output(new DiscardingOutputFormat<Long>());
- mappedC.output(new DiscardingOutputFormat<Long>());
- mappedC.output(new DiscardingOutputFormat<Long>());
-
- Plan plan = env.createProgramPlan();
- Set<Operator<?>> sinks = new HashSet<Operator<?>>(plan.getDataSinks());
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // ---------- check the optimizer plan ----------
-
- // number of sinks
- assertEquals("Wrong number of data sinks.", 3, oPlan.getDataSinks().size());
-
- // remove matching sinks to check relation
- for (SinkPlanNode sink : oPlan.getDataSinks()) {
- assertTrue(sinks.remove(sink.getProgramOperator()));
- }
- assertTrue(sinks.isEmpty());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
-
- /**
- * <pre>
- * SINK
- * |
- * COGROUP
- * +---/ \----+
- * / \
- * / MATCH10
- * / | \
- * / | MATCH9
- * MATCH5 | | \
- * | \ | | MATCH8
- * | MATCH4 | | | \
- * | | \ | | | MATCH7
- * | | MATCH3 | | | | \
- * | | | \ | | | | MATCH6
- * | | | MATCH2 | | | | | |
- * | | | | \ +--+--+--+--+--+
- * | | | | MATCH1 MAP
- * \ | | | | | /-----------/
- * (DATA SOURCE ONE)
- * </pre>
- */
- @Test
- public void testBranchingSourceMultipleTimes() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- DataSet<Tuple2<Long, Long>> source = env.generateSequence(1, 10000000)
- .map(new Duplicator<Long>());
-
- DataSet<Tuple2<Long, Long>> joined1 = source.join(source).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined2 = source.join(joined1).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined3 = source.join(joined2).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined4 = source.join(joined3).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined5 = source.join(joined4).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> mapped = source.map(
- new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
- @Override
- public Tuple2<Long, Long> map(Tuple2<Long, Long> value) {
- return null;
- }
- });
-
- DataSet<Tuple2<Long, Long>> joined6 = mapped.join(mapped).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined7 = mapped.join(joined6).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined8 = mapped.join(joined7).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined9 = mapped.join(joined8).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined10 = mapped.join(joined9).where(0).equalTo(0)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
-
- joined5.coGroup(joined10)
- .where(1).equalTo(1)
- .with(new DummyCoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>())
-
- .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
- Plan plan = env.createProgramPlan();
- OptimizedPlan oPlan = compileNoStats(plan);
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- *
- * <pre>
-
- * (SINK A)
- * | (SINK B) (SINK C)
- * CROSS / /
- * / \ | +------+
- * / \ | /
- * REDUCE MATCH2
- * | +---/ \
- * \ / |
- * MAP |
- * | |
- * COGROUP MATCH1
- * / \ / \
- * (SRC A) (SRC B) (SRC C)
- * </pre>
- */
- @Test
- public void testBranchingWithMultipleDataSinks() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- DataSet<Tuple2<Long, Long>> sourceA = env.generateSequence(1, 10000000)
- .map(new Duplicator<Long>());
-
- DataSet<Tuple2<Long, Long>> sourceB = env.generateSequence(1, 10000000)
- .map(new Duplicator<Long>());
-
- DataSet<Tuple2<Long, Long>> sourceC = env.generateSequence(1, 10000000)
- .map(new Duplicator<Long>());
-
- DataSet<Tuple2<Long, Long>> mapped = sourceA.coGroup(sourceB)
- .where(0).equalTo(1)
- .with(new CoGroupFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
- @Override
- public void coGroup(Iterable<Tuple2<Long, Long>> first,
- Iterable<Tuple2<Long, Long>> second,
- Collector<Tuple2<Long, Long>> out) {
- }
- })
- .map(new IdentityMapper<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined = sourceB.join(sourceC)
- .where(0).equalTo(1)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> joined2 = mapped.join(joined)
- .where(1).equalTo(1)
- .with(new DummyFlatJoinFunction<Tuple2<Long, Long>>());
-
- DataSet<Tuple2<Long, Long>> reduced = mapped
- .groupBy(1)
- .reduceGroup(new Top1GroupReducer<Tuple2<Long, Long>>());
-
- reduced.cross(joined2)
- .output(new DiscardingOutputFormat<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>>());
-
- joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
- joined2.output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
-
- Plan plan = env.createProgramPlan();
- OptimizedPlan oPlan = compileNoStats(plan);
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void testBranchEachContractType() {
- try {
- // construct the plan
- FileDataSource sourceA = new FileDataSource(new DummyInputFormat(), "file:///test/file1", "Source A");
- FileDataSource sourceB = new FileDataSource(new DummyInputFormat(), "file:///test/file2", "Source B");
- FileDataSource sourceC = new FileDataSource(new DummyInputFormat(), "file:///test/file3", "Source C");
-
- MapOperator map1 = MapOperator.builder(new IdentityMap()).input(sourceA).name("Map 1").build();
-
- ReduceOperator reduce1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(map1)
- .name("Reduce 1")
- .build();
-
- JoinOperator match1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(sourceB, sourceB, sourceC)
- .input2(sourceC)
- .name("Match 1")
- .build();
- ;
- CoGroupOperator cogroup1 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(sourceA)
- .input2(sourceB)
- .name("CoGroup 1")
- .build();
-
- CrossOperator cross1 = CrossOperator.builder(new DummyCrossStub())
- .input1(reduce1)
- .input2(cogroup1)
- .name("Cross 1")
- .build();
-
-
- CoGroupOperator cogroup2 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(cross1)
- .input2(cross1)
- .name("CoGroup 2")
- .build();
-
- CoGroupOperator cogroup3 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(map1)
- .input2(match1)
- .name("CoGroup 3")
- .build();
-
-
- MapOperator map2 = MapOperator.builder(new IdentityMap()).input(cogroup3).name("Map 2").build();
-
- CoGroupOperator cogroup4 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(map2)
- .input2(match1)
- .name("CoGroup 4")
- .build();
-
- CoGroupOperator cogroup5 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(cogroup2)
- .input2(cogroup1)
- .name("CoGroup 5")
- .build();
-
- CoGroupOperator cogroup6 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(reduce1)
- .input2(cogroup4)
- .name("CoGroup 6")
- .build();
-
- CoGroupOperator cogroup7 = CoGroupOperator.builder(new DummyCoGroupStub(), IntValue.class, 0,0)
- .input1(cogroup5)
- .input2(cogroup6)
- .name("CoGroup 7")
- .build();
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, cogroup7);
- sink.addInput(sourceA);
- sink.addInput(cogroup3);
- sink.addInput(cogroup4);
- sink.addInput(cogroup1);
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Branching of each contract type");
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
-
- //Compile plan to verify that no error is thrown
- jobGen.compileJobGraph(oPlan);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
-
- @Test
- public void testBranchingUnion() {
- try {
- // construct the plan
- FileDataSource source1 = new FileDataSource(new DummyInputFormat(), IN_FILE);
- FileDataSource source2 = new FileDataSource(new DummyInputFormat(), IN_FILE);
-
- JoinOperator mat1 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(source1)
- .input2(source2)
- .name("Match 1")
- .build();
-
- MapOperator ma1 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map1").build();
-
- ReduceOperator r1 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(ma1)
- .name("Reduce 1")
- .build();
-
- ReduceOperator r2 = ReduceOperator.builder(new IdentityReduce(), IntValue.class, 0)
- .input(mat1)
- .name("Reduce 2")
- .build();
-
- MapOperator ma2 = MapOperator.builder(new IdentityMap()).input(mat1).name("Map 2").build();
-
- MapOperator ma3 = MapOperator.builder(new IdentityMap()).input(ma2).name("Map 3").build();
-
- @SuppressWarnings("unchecked")
- JoinOperator mat2 = JoinOperator.builder(new DummyMatchStub(), IntValue.class, 0, 0)
- .input1(r1, r2, ma2, ma3)
- .input2(ma2)
- .name("Match 2")
- .build();
- mat2.setParameter(Optimizer.HINT_LOCAL_STRATEGY, Optimizer.HINT_LOCAL_STRATEGY_MERGE);
-
- FileDataSink sink = new FileDataSink(new DummyOutputFormat(), OUT_FILE, mat2);
-
-
- // return the PACT plan
- Plan plan = new Plan(sink, "Branching Union");
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
-
- //Compile plan to verify that no error is thrown
- jobGen.compileJobGraph(oPlan);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- *
- * <pre>
- * (SRC A)
- * / \
- * (SINK A) (SINK B)
- * </pre>
- */
- @Test
- public void testBranchingWithMultipleDataSinksSmall() {
- try {
- // construct the plan
- final String out1Path = "file:///test/1";
- final String out2Path = "file:///test/2";
-
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
-
- FileDataSink sinkA = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA);
- FileDataSink sinkB = new FileDataSink(DummyOutputFormat.class, out2Path, sourceA);
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sinkA);
- sinks.add(sinkB);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Plans With Multiple Data Sinks");
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- // ---------- check the optimizer plan ----------
-
- // number of sinks
- Assert.assertEquals("Wrong number of data sinks.", 2, oPlan.getDataSinks().size());
-
- // sinks contain all sink paths
- Set<String> allSinks = new HashSet<String>();
- allSinks.add(out1Path);
- allSinks.add(out2Path);
-
- for (SinkPlanNode n : oPlan.getDataSinks()) {
- String path = ((FileDataSink) n.getSinkNode().getOperator()).getFilePath();
- Assert.assertTrue("Invalid data sink.", allSinks.remove(path));
- }
-
- // ---------- compile plan to nephele job graph to verify that no error is thrown ----------
-
- JobGraphGenerator jobGen = new JobGraphGenerator();
- jobGen.compileJobGraph(oPlan);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- *
- * <pre>
- * (SINK 3) (SINK 1) (SINK 2) (SINK 4)
- * \ / \ /
- * (SRC A) (SRC B)
- * </pre>
- *
- * NOTE: this case is currently not caught by the compiler. we should enable the test once it is caught.
- */
- @Test
- public void testBranchingDisjointPlan() {
- // construct the plan
- final String out1Path = "file:///test/1";
- final String out2Path = "file:///test/2";
- final String out3Path = "file:///test/3";
- final String out4Path = "file:///test/4";
-
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE);
- FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE);
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, out1Path, sourceA, "1");
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, out2Path, sourceB, "2");
- FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, out3Path, sourceA, "3");
- FileDataSink sink4 = new FileDataSink(DummyOutputFormat.class, out4Path, sourceB, "4");
-
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
- sinks.add(sink3);
- sinks.add(sink4);
-
- // return the PACT plan
- Plan plan = new Plan(sinks, "Disjoint plan with multiple data sinks and branches");
- compileNoStats(plan);
- }
-
- @Test
- public void testBranchAfterIteration() {
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(sourceA);
- iteration.setMaximumNumberOfIterations(10);
-
- MapOperator mapper = MapOperator.builder(IdentityMap.class).name("Mapper").input(iteration.getPartialSolution()).build();
- iteration.setNextPartialSolution(mapper);
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 1");
-
- MapOperator postMap = MapOperator.builder(IdentityMap.class).name("Post Iteration Mapper")
- .input(iteration).build();
-
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink 2");
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
-
- Plan plan = new Plan(sinks);
-
- try {
- compileNoStats(plan);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testBranchBeforeIteration() {
- FileDataSource source1 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
- FileDataSource source2 = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(source2);
- iteration.setMaximumNumberOfIterations(10);
-
- MapOperator inMap = MapOperator.builder(new IdentityMap())
- .input(source1)
- .name("In Iteration Map")
- .setBroadcastVariable("BC", iteration.getPartialSolution())
- .build();
-
- iteration.setNextPartialSolution(inMap);
-
- MapOperator postMap = MapOperator.builder(new IdentityMap())
- .input(source1)
- .name("Post Iteration Map")
- .setBroadcastVariable("BC", iteration)
- .build();
-
- FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, postMap, "Sink");
-
- Plan plan = new Plan(sink);
-
- try {
- compileNoStats(plan);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- * Test to ensure that sourceA is inside as well as outside of the iteration the same
- * node.
- *
- * <pre>
- * (SRC A) (SRC B)
- * / \ / \
- * (SINK 1) (ITERATION) | (SINK 2)
- * / \ /
- * (SINK 3) (CROSS => NEXT PARTIAL SOLUTION)
- * </pre>
- */
- @Test
- public void testClosure() {
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
- FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceB, "Sink 2");
-
- BulkIteration iteration = new BulkIteration("Loop");
- iteration.setInput(sourceA);
- iteration.setMaximumNumberOfIterations(10);
-
- CrossOperator stepFunction = CrossOperator.builder(DummyCrossStub.class).name("StepFunction").
- input1(iteration.getPartialSolution()).
- input2(sourceB).
- build();
-
- iteration.setNextPartialSolution(stepFunction);
-
- FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
- sinks.add(sink3);
-
- Plan plan = new Plan(sinks);
-
- try{
- compileNoStats(plan);
- }catch(Exception e){
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- * <pre>
- * (SRC A) (SRC B) (SRC C)
- * / \ / / \
- * (SINK 1) (DELTA ITERATION) | (SINK 2)
- * / | \ /
- * (SINK 3) | (CROSS => NEXT WORKSET)
- * | |
- * (JOIN => SOLUTION SET DELTA)
- * </pre>
- */
- @Test
- public void testClosureDeltaIteration() {
- FileDataSource sourceA = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 1");
- FileDataSource sourceB = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 2");
- FileDataSource sourceC = new FileDataSource(DummyInputFormat.class, IN_FILE, "Source 3");
-
- FileDataSink sink1 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceA, "Sink 1");
- FileDataSink sink2 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, sourceC, "Sink 2");
-
- DeltaIteration iteration = new DeltaIteration(0, "Loop");
- iteration.setInitialSolutionSet(sourceA);
- iteration.setInitialWorkset(sourceB);
- iteration.setMaximumNumberOfIterations(10);
-
- CrossOperator nextWorkset = CrossOperator.builder(DummyCrossStub.class).name("Next workset").
- input1(iteration.getWorkset()).
- input2(sourceC).
- build();
-
- JoinOperator solutionSetDelta = JoinOperator.builder(DummyMatchStub.class, LongValue.class,0,0).
- name("Next solution set.").
- input1(nextWorkset).
- input2(iteration.getSolutionSet()).
- build();
-
- iteration.setNextWorkset(nextWorkset);
- iteration.setSolutionSetDelta(solutionSetDelta);
-
- FileDataSink sink3 = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Sink 3");
-
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink1);
- sinks.add(sink2);
- sinks.add(sink3);
-
- Plan plan = new Plan(sinks);
-
- try{
- compileNoStats(plan);
- }catch(Exception e){
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- * <pre>
- * +----Iteration-------+
- * | |
- * /---------< >---------join-----< >---sink
- * / (Solution)| / |
- * / | / |
- * /--map-------< >----\ / /--|
- * / (Workset)| \ / / |
- * src-map | join------/ |
- * \ | / |
- * \ +-----/--------------+
- * \ /
- * \--reduce-------/
- * </pre>
- */
- @Test
- public void testDeltaIterationWithStaticInput() {
- FileDataSource source = new FileDataSource(DummyInputFormat.class, IN_FILE, "source");
-
- MapOperator mappedSource = MapOperator.builder(IdentityMap.class).
- input(source).
- name("Identity mapped source").
- build();
-
- ReduceOperator reducedSource = ReduceOperator.builder(IdentityReduce.class).
- input(source).
- name("Identity reduce source").
- build();
-
- DeltaIteration iteration = new DeltaIteration(0,"Loop");
- iteration.setMaximumNumberOfIterations(10);
- iteration.setInitialSolutionSet(source);
- iteration.setInitialWorkset(mappedSource);
-
- JoinOperator nextWorkset = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,0).
- input1(iteration.getWorkset()).
- input2(reducedSource).
- name("Next work set").
- build();
-
- JoinOperator solutionSetDelta = JoinOperator.builder(DummyNonPreservingMatchStub.class, IntValue.class, 0,
- 0).
- input1(iteration.getSolutionSet()).
- input2(nextWorkset).
- name("Solution set delta").
- build();
-
- iteration.setNextWorkset(nextWorkset);
- iteration.setSolutionSetDelta(solutionSetDelta);
-
- FileDataSink sink = new FileDataSink(DummyOutputFormat.class, OUT_FILE, iteration, "Iteration sink");
- List<FileDataSink> sinks = new ArrayList<FileDataSink>();
- sinks.add(sink);
-
- Plan plan = new Plan(sinks);
-
- try{
- compileNoStats(plan);
- }catch(Exception e){
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- /**
- * <pre>
- * +---------Iteration-------+
- * | |
- * /--map--< >----\ |
- * / | \ /-------< >---sink
- * src-map | join------/ |
- * \ | / |
- * \ +-----/-------------------+
- * \ /
- * \--reduce--/
- * </pre>
- */
- @Test
- public void testIterationWithStaticInput() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
-
- DataSet<Long> source = env.generateSequence(1, 1000000);
-
- DataSet<Long> mapped = source.map(new IdentityMapper<Long>());
-
- DataSet<Long> reduced = source.groupBy(new IdentityKeyExtractor<Long>()).reduce(new SelectOneReducer<Long>());
-
- IterativeDataSet<Long> iteration = mapped.iterate(10);
- iteration.closeWith(
- iteration.join(reduced)
- .where(new IdentityKeyExtractor<Long>())
- .equalTo(new IdentityKeyExtractor<Long>())
- .with(new DummyFlatJoinFunction<Long>()))
- .output(new DiscardingOutputFormat<Long>());
-
- compileNoStats(env.createProgramPlan());
- }
- catch(Exception e){
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testBranchingBroadcastVariable() {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
-
- DataSet<String> input1 = env.readTextFile(IN_FILE).name("source1");
- DataSet<String> input2 = env.readTextFile(IN_FILE).name("source2");
- DataSet<String> input3 = env.readTextFile(IN_FILE).name("source3");
-
- DataSet<String> result1 = input1
- .map(new IdentityMapper<String>())
- .reduceGroup(new Top1GroupReducer<String>())
- .withBroadcastSet(input3, "bc");
-
- DataSet<String> result2 = input2
- .map(new IdentityMapper<String>())
- .reduceGroup(new Top1GroupReducer<String>())
- .withBroadcastSet(input3, "bc");
-
- result1.join(result2)
- .where(new IdentityKeyExtractor<String>())
- .equalTo(new IdentityKeyExtractor<String>())
- .with(new RichJoinFunction<String, String, String>() {
- @Override
- public String join(String first, String second) {
- return null;
- }
- })
- .withBroadcastSet(input3, "bc1")
- .withBroadcastSet(input1, "bc2")
- .withBroadcastSet(result1, "bc3")
- .print();
-
- Plan plan = env.createProgramPlan();
-
- try{
- compileNoStats(plan);
- }catch(Exception e){
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testBCVariableClosure() {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
-
- DataSet<String> reduced = input
- .map(new IdentityMapper<String>())
- .reduceGroup(new Top1GroupReducer<String>());
-
-
- DataSet<String> initialSolution = input.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc");
-
-
- IterativeDataSet<String> iteration = initialSolution.iterate(100);
-
- iteration.closeWith(iteration.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "red"))
- .print();
-
- Plan plan = env.createProgramPlan();
-
- try{
- compileNoStats(plan);
- }catch(Exception e){
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultipleIterations() {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
-
- DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
-
- DataSet<String> reduced = input
- .map(new IdentityMapper<String>())
- .reduceGroup(new Top1GroupReducer<String>());
-
- IterativeDataSet<String> iteration1 = input.iterate(100);
- IterativeDataSet<String> iteration2 = input.iterate(20);
- IterativeDataSet<String> iteration3 = input.iterate(17);
-
- iteration1.closeWith(iteration1.map(new IdentityMapper<String>()).withBroadcastSet(reduced, "bc1")).print();
- iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>()).withBroadcastSet(reduced, "bc2")).print();
- iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>()).withBroadcastSet(reduced, "bc3")).print();
-
- Plan plan = env.createProgramPlan();
-
- try{
- compileNoStats(plan);
- }catch(Exception e){
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testMultipleIterationsWithClosueBCVars() {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
-
- DataSet<String> input = env.readTextFile(IN_FILE).name("source1");
-
- IterativeDataSet<String> iteration1 = input.iterate(100);
- IterativeDataSet<String> iteration2 = input.iterate(20);
- IterativeDataSet<String> iteration3 = input.iterate(17);
-
-
- iteration1.closeWith(iteration1.map(new IdentityMapper<String>())).print();
- iteration2.closeWith(iteration2.reduceGroup(new Top1GroupReducer<String>())).print();
- iteration3.closeWith(iteration3.reduceGroup(new IdentityGroupReducer<String>())).print();
-
- Plan plan = env.createProgramPlan();
-
- try{
- compileNoStats(plan);
- }catch(Exception e){
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testBranchesOnlyInBCVariables1() {
- try{
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
-
- DataSet<Long> input = env.generateSequence(1, 10);
- DataSet<Long> bc_input = env.generateSequence(1, 10);
-
- input
- .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name1")
- .map(new IdentityMapper<Long>()).withBroadcastSet(bc_input, "name2")
- .print();
-
- Plan plan = env.createProgramPlan();
- compileNoStats(plan);
- }
- catch(Exception e){
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testBranchesOnlyInBCVariables2() {
- try{
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(100);
-
- DataSet<Tuple2<Long, Long>> input = env.generateSequence(1, 10).map(new Duplicator<Long>()).name("proper input");
-
- DataSet<Long> bc_input1 = env.generateSequence(1, 10).name("BC input 1");
- DataSet<Long> bc_input2 = env.generateSequence(1, 10).name("BC input 1");
-
- DataSet<Tuple2<Long, Long>> joinInput1 =
- input.map(new IdentityMapper<Tuple2<Long,Long>>())
- .withBroadcastSet(bc_input1.map(new IdentityMapper<Long>()), "bc1")
- .withBroadcastSet(bc_input2, "bc2");
-
- DataSet<Tuple2<Long, Long>> joinInput2 =
- input.map(new IdentityMapper<Tuple2<Long,Long>>())
- .withBroadcastSet(bc_input1, "bc1")
- .withBroadcastSet(bc_input2, "bc2");
-
- DataSet<Tuple2<Long, Long>> joinResult = joinInput1
- .join(joinInput2, JoinHint.REPARTITION_HASH_FIRST).where(0).equalTo(1)
- .with(new DummyFlatJoinFunction<Tuple2<Long,Long>>());
-
- input
- .map(new IdentityMapper<Tuple2<Long,Long>>())
- .withBroadcastSet(bc_input1, "bc1")
- .union(joinResult)
- .print();
-
- Plan plan = env.createProgramPlan();
- compileNoStats(plan);
- }
- catch(Exception e){
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- private static final class Duplicator<T> implements MapFunction<T, Tuple2<T, T>> {
-
- @Override
- public Tuple2<T, T> map(T value) {
- return new Tuple2<T, T>(value, value);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
deleted file mode 100644
index c7ad2da..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/BroadcastVariablePipelinebreakerTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.common.Plan;
-import org.junit.Test;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.testfunctions.IdentityMapper;
-
-@SuppressWarnings("serial")
-public class BroadcastVariablePipelinebreakerTest extends CompilerTestBase {
-
- @Test
- public void testNoBreakerForIndependentVariable() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> source1 = env.fromElements("test");
- DataSet<String> source2 = env.fromElements("test");
-
- source1.map(new IdentityMapper<String>()).withBroadcastSet(source2, "some name").print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-
- assertEquals(TempMode.NONE, mapper.getInput().getTempMode());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testBreakerForDependentVariable() {
- try {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<String> source1 = env.fromElements("test");
-
- source1.map(new IdentityMapper<String>()).map(new IdentityMapper<String>()).withBroadcastSet(source1, "some name").print();
-
- Plan p = env.createProgramPlan();
- OptimizedPlan op = compileNoStats(p);
-
- SinkPlanNode sink = op.getDataSinks().iterator().next();
- SingleInputPlanNode mapper = (SingleInputPlanNode) sink.getInput().getSource();
-
- assertEquals(TempMode.PIPELINE_BREAKER, mapper.getInput().getTempMode());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
deleted file mode 100644
index 3e7da6c..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CachedMatchStrategyCompilerTest.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
-* Tests that validate optimizer choice when using hash joins inside of iterations
-*/
-@SuppressWarnings("serial")
-public class CachedMatchStrategyCompilerTest extends CompilerTestBase {
-
- /**
- * This tests whether a HYBRIDHASH_BUILD_SECOND is correctly transformed to a HYBRIDHASH_BUILD_SECOND_CACHED
- * when inside of an iteration an on the static path
- */
- @Test
- public void testRightSide() {
- try {
-
- Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-
- // verify correct join strategy
- assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
- assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
- assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- /**
- * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
- */
- @Test
- public void testRightSideCountercheck() {
- try {
-
- Plan plan = getTestPlanRightStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-
- // verify correct join strategy
- assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST, innerJoin.getDriverStrategy());
- assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
- assertEquals(TempMode.CACHED, innerJoin.getInput2().getTempMode());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- /**
- * This tests whether a HYBRIDHASH_BUILD_FIRST is correctly transformed to a HYBRIDHASH_BUILD_FIRST_CACHED
- * when inside of an iteration an on the static path
- */
- @Test
- public void testLeftSide() {
- try {
-
- Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_FIRST);
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-
- // verify correct join strategy
- assertEquals(DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED, innerJoin.getDriverStrategy());
- assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
- assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- /**
- * This test makes sure that only a HYBRIDHASH on the static path is transformed to the cached variant
- */
- @Test
- public void testLeftSideCountercheck() {
- try {
-
- Plan plan = getTestPlanLeftStatic(Optimizer.HINT_LOCAL_STRATEGY_HASH_BUILD_SECOND);
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-
- // verify correct join strategy
- assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND, innerJoin.getDriverStrategy());
- assertEquals(TempMode.CACHED, innerJoin.getInput1().getTempMode());
- assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- /**
- * This test simulates a join of a big left side with a small right side inside of an iteration, where the small side is on a static path.
- * Currently the best execution plan is a HYBRIDHASH_BUILD_SECOND_CACHED, where the small side is hashed and cached.
- * This test also makes sure that all relevant plans are correctly enumerated by the optimizer.
- */
- @Test
- public void testCorrectChoosing() {
- try {
-
- Plan plan = getTestPlanRightStatic("");
-
- SourceCollectorVisitor sourceCollector = new SourceCollectorVisitor();
- plan.accept(sourceCollector);
-
- for(GenericDataSourceBase<?, ?> s : sourceCollector.getSources()) {
- if(s.getName().equals("bigFile")) {
- this.setSourceStatistics(s, 10000000, 1000);
- }
- else if(s.getName().equals("smallFile")) {
- this.setSourceStatistics(s, 100, 100);
- }
- }
-
-
- OptimizedPlan oPlan = compileNoStats(plan);
-
- OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(oPlan);
- DualInputPlanNode innerJoin = resolver.getNode("DummyJoiner");
-
- // verify correct join strategy
- assertEquals(DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED, innerJoin.getDriverStrategy());
- assertEquals(TempMode.NONE, innerJoin.getInput1().getTempMode());
- assertEquals(TempMode.NONE, innerJoin.getInput2().getTempMode());
-
- new JobGraphGenerator().compileJobGraph(oPlan);
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Test errored: " + e.getMessage());
- }
- }
-
- private Plan getTestPlanRightStatic(String strategy) {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- DataSet<Tuple3<Long, Long, Long>> bigInput = env.readCsvFile("file://bigFile").types(Long.class, Long.class, Long.class).name("bigFile");
-
- DataSet<Tuple3<Long, Long, Long>> smallInput = env.readCsvFile("file://smallFile").types(Long.class, Long.class, Long.class).name("smallFile");
-
- IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
-
- Configuration joinStrategy = new Configuration();
- joinStrategy.setString(Optimizer.HINT_SHIP_STRATEGY, Optimizer.HINT_SHIP_STRATEGY_REPARTITION_HASH);
-
- if(strategy != "") {
- joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
- }
-
- DataSet<Tuple3<Long, Long, Long>> inner = iteration.join(smallInput).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-
- DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
-
- output.print();
-
- return env.createProgramPlan();
-
- }
-
- private Plan getTestPlanLeftStatic(String strategy) {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- env.setDegreeOfParallelism(DEFAULT_PARALLELISM);
-
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> bigInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L),
- new Tuple3<Long, Long, Long>(1L, 2L, 3L),new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Big");
-
- @SuppressWarnings("unchecked")
- DataSet<Tuple3<Long, Long, Long>> smallInput = env.fromElements(new Tuple3<Long, Long, Long>(1L, 2L, 3L)).name("Small");
-
- IterativeDataSet<Tuple3<Long, Long, Long>> iteration = bigInput.iterate(10);
-
- Configuration joinStrategy = new Configuration();
- joinStrategy.setString(Optimizer.HINT_LOCAL_STRATEGY, strategy);
-
- DataSet<Tuple3<Long, Long, Long>> inner = smallInput.join(iteration).where(0).equalTo(0).with(new DummyJoiner()).name("DummyJoiner").withParameters(joinStrategy);
-
- DataSet<Tuple3<Long, Long, Long>> output = iteration.closeWith(inner);
-
- output.print();
-
- return env.createProgramPlan();
-
- }
-
- private static class DummyJoiner extends RichJoinFunction<Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>, Tuple3<Long, Long, Long>> {
-
- @Override
- public Tuple3<Long, Long, Long> join(Tuple3<Long, Long, Long> first,
- Tuple3<Long, Long, Long> second) throws Exception {
-
- return first;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java b/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
deleted file mode 100644
index eba07f1..0000000
--- a/flink-compiler/src/test/java/org/apache/flink/optimizer/CoGroupSolutionSetFirstTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer;
-
-import org.apache.flink.api.common.functions.RichCoGroupFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Visitor;
-
-@SuppressWarnings("serial")
-public class CoGroupSolutionSetFirstTest extends CompilerTestBase {
-
- public static class SimpleCGroup extends RichCoGroupFunction<Tuple1<Integer>, Tuple1<Integer>, Tuple1<Integer>> {
- @Override
- public void coGroup(Iterable<Tuple1<Integer>> first, Iterable<Tuple1<Integer>> second, Collector<Tuple1<Integer>> out) {}
- }
-
- public static class SimpleMap extends RichMapFunction<Tuple1<Integer>, Tuple1<Integer>> {
- @Override
- public Tuple1<Integer> map(Tuple1<Integer> value) throws Exception {
- return null;
- }
- }
-
- @Test
- public void testCoGroupSolutionSet() {
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Tuple1<Integer>> raw = env.readCsvFile(IN_FILE).types(Integer.class);
-
- DeltaIteration<Tuple1<Integer>, Tuple1<Integer>> iteration = raw.iterateDelta(raw, 1000, 0);
-
- DataSet<Tuple1<Integer>> test = iteration.getWorkset().map(new SimpleMap());
- DataSet<Tuple1<Integer>> delta = iteration.getSolutionSet().coGroup(test).where(0).equalTo(0).with(new SimpleCGroup());
- DataSet<Tuple1<Integer>> feedback = iteration.getWorkset().map(new SimpleMap());
- DataSet<Tuple1<Integer>> result = iteration.closeWith(delta, feedback);
-
- result.print();
-
- Plan plan = env.createProgramPlan();
- OptimizedPlan oPlan = null;
- try {
- oPlan = compileNoStats(plan);
- } catch(CompilerException e) {
- Assert.fail(e.getMessage());
- }
-
- oPlan.accept(new Visitor<PlanNode>() {
- @Override
- public boolean preVisit(PlanNode visitable) {
- if (visitable instanceof WorksetIterationPlanNode) {
- PlanNode deltaNode = ((WorksetIterationPlanNode) visitable).getSolutionSetDeltaPlanNode();
-
- //get the CoGroup
- DualInputPlanNode dpn = (DualInputPlanNode) deltaNode.getInputs().iterator().next().getSource();
- Channel in1 = dpn.getInput1();
- Channel in2 = dpn.getInput2();
-
- Assert.assertTrue(in1.getLocalProperties().getOrdering() == null);
- Assert.assertTrue(in2.getLocalProperties().getOrdering() != null);
- Assert.assertTrue(in2.getLocalProperties().getOrdering().getInvolvedIndexes().contains(0));
- Assert.assertTrue(in1.getShipStrategy() == ShipStrategyType.FORWARD);
- Assert.assertTrue(in2.getShipStrategy() == ShipStrategyType.PARTITION_HASH);
- return false;
- }
- return true;
- }
-
- @Override
- public void postVisit(PlanNode visitable) {}
- });
- }
-}