You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:07:12 UTC
[33/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
deleted file mode 100644
index 7ae35c3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-
-public class NoOpDescriptor extends OperatorDescriptorSingle {
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.UNARY_NO_OP;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "Pipe", in, DriverStrategy.UNARY_NO_OP);
- }
-
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- return Collections.singletonList(new RequestedGlobalProperties());
- }
-
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- return Collections.singletonList(new RequestedLocalProperties());
- }
-
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- return gProps;
- }
-
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- return lProps;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
deleted file mode 100644
index c21593e..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-
-/**
- *
- */
-public abstract class OperatorDescriptorDual implements AbstractOperatorDescriptor {
-
- protected final FieldList keys1;
- protected final FieldList keys2;
-
- private List<GlobalPropertiesPair> globalProps;
- private List<LocalPropertiesPair> localProps;
-
- protected OperatorDescriptorDual() {
- this(null, null);
- }
-
- protected OperatorDescriptorDual(FieldList keys1, FieldList keys2) {
- this.keys1 = keys1;
- this.keys2 = keys2;
- }
-
- public List<GlobalPropertiesPair> getPossibleGlobalProperties() {
- if (this.globalProps == null) {
- this.globalProps = createPossibleGlobalProperties();
- }
-
- return this.globalProps;
- }
-
- public List<LocalPropertiesPair> getPossibleLocalProperties() {
- if (this.localProps == null) {
- this.localProps = createPossibleLocalProperties();
- }
-
- return this.localProps;
- }
-
- protected abstract List<GlobalPropertiesPair> createPossibleGlobalProperties();
-
- protected abstract List<LocalPropertiesPair> createPossibleLocalProperties();
-
- public abstract boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
- GlobalProperties produced1, GlobalProperties produced2);
-
- public abstract boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
- LocalProperties produced1, LocalProperties produced2);
-
- public abstract DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node);
-
- public abstract GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2);
-
- public abstract LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2);
-
- protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2) {
-
- // check number of produced partitioning fields
- if(fields1.size() != fields2.size()) {
- return false;
- } else {
- return checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size());
- }
- }
-
- protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2, int numRelevantFields) {
-
- // check number of produced partitioning fields
- if(fields1.size() < numRelevantFields || fields2.size() < numRelevantFields) {
- return false;
- }
- else {
- for(int i=0; i<numRelevantFields; i++) {
- int pField1 = fields1.get(i);
- int pField2 = fields2.get(i);
- // check if position of both produced fields is the same in both requested fields
- int j;
- for(j=0; j<this.keys1.size(); j++) {
- if(this.keys1.get(j) == pField1 && this.keys2.get(j) == pField2) {
- break;
- }
- else if(this.keys1.get(j) != pField1 && this.keys2.get(j) != pField2) {
- // do nothing
- }
- else {
- return false;
- }
- }
- if(j == this.keys1.size()) {
- throw new CompilerException("Fields were not found in key fields.");
- }
- }
- }
- return true;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public static final class GlobalPropertiesPair {
-
- private final RequestedGlobalProperties props1, props2;
-
- public GlobalPropertiesPair(RequestedGlobalProperties props1, RequestedGlobalProperties props2) {
- this.props1 = props1;
- this.props2 = props2;
- }
-
- public RequestedGlobalProperties getProperties1() {
- return this.props1;
- }
-
- public RequestedGlobalProperties getProperties2() {
- return this.props2;
- }
-
- @Override
- public int hashCode() {
- return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == GlobalPropertiesPair.class) {
- final GlobalPropertiesPair other = (GlobalPropertiesPair) obj;
-
- return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) &&
- (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2));
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "{" + this.props1 + " / " + this.props2 + "}";
- }
- }
-
- public static final class LocalPropertiesPair {
-
- private final RequestedLocalProperties props1, props2;
-
- public LocalPropertiesPair(RequestedLocalProperties props1, RequestedLocalProperties props2) {
- this.props1 = props1;
- this.props2 = props2;
- }
-
- public RequestedLocalProperties getProperties1() {
- return this.props1;
- }
-
- public RequestedLocalProperties getProperties2() {
- return this.props2;
- }
-
- @Override
- public int hashCode() {
- return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj.getClass() == LocalPropertiesPair.class) {
- final LocalPropertiesPair other = (LocalPropertiesPair) obj;
-
- return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) &&
- (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2));
- }
- return false;
- }
-
- @Override
- public String toString() {
- return "{" + this.props1 + " / " + this.props2 + "}";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
deleted file mode 100644
index c8be5d4..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-
-/**
- * Abstract base class for Operator descriptions which instantiates the node and sets the driver
- * strategy and the sorting and grouping keys. Returns possible local and global properties and
- * updates them after the operation has been performed.
- * @see org.apache.flink.compiler.dag.SingleInputNode
- */
-public abstract class OperatorDescriptorSingle implements AbstractOperatorDescriptor {
-
- protected final FieldSet keys; // the set of key fields
- protected final FieldList keyList; // the key fields with ordered field positions
-
- private List<RequestedGlobalProperties> globalProps;
- private List<RequestedLocalProperties> localProps;
-
-
- protected OperatorDescriptorSingle() {
- this(null);
- }
-
- protected OperatorDescriptorSingle(FieldSet keys) {
- this.keys = keys;
- this.keyList = keys == null ? null : keys.toFieldList();
- }
-
-
- public List<RequestedGlobalProperties> getPossibleGlobalProperties() {
- if (this.globalProps == null) {
- this.globalProps = createPossibleGlobalProperties();
- }
- return this.globalProps;
- }
-
- public List<RequestedLocalProperties> getPossibleLocalProperties() {
- if (this.localProps == null) {
- this.localProps = createPossibleLocalProperties();
- }
- return this.localProps;
- }
-
- /**
- * Returns a list of global properties that are required by this operator descriptor.
- *
- * @return A list of global properties that are required by this operator descriptor.
- */
- protected abstract List<RequestedGlobalProperties> createPossibleGlobalProperties();
-
- /**
- * Returns a list of local properties that are required by this operator descriptor.
- *
- * @return A list of local properties that are required by this operator descriptor.
- */
- protected abstract List<RequestedLocalProperties> createPossibleLocalProperties();
-
- public abstract SingleInputPlanNode instantiate(Channel in, SingleInputNode node);
-
- /**
- * Returns the global properties which are present after the operator was applied on the
- * provided global properties.
- *
- * @param in The global properties on which the operator is applied.
- * @return The global properties which are valid after the operator has been applied.
- */
- public abstract GlobalProperties computeGlobalProperties(GlobalProperties in);
-
- /**
- * Returns the local properties which are present after the operator was applied on the
- * provided local properties.
- *
- * @param in The local properties on which the operator is applied.
- * @return The local properties which are valid after the operator has been applied.
- */
- public abstract LocalProperties computeLocalProperties(LocalProperties in);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
deleted file mode 100644
index 2bde29b..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.dag.GroupReduceNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-public final class PartialGroupProperties extends OperatorDescriptorSingle {
-
- public PartialGroupProperties(FieldSet keys) {
- super(keys);
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.SORTED_GROUP_COMBINE;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- // create in input node for combine with same DOP as input node
- GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
- combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
-
- SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in,
- DriverStrategy.SORTED_GROUP_COMBINE);
- // sorting key info
- combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
- // set grouping comparator key info
- combiner.setDriverKeyInfo(this.keyList, 1);
-
- return combiner;
- }
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- return Collections.singletonList(new RequestedGlobalProperties());
- }
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- RequestedLocalProperties props = new RequestedLocalProperties();
- props.setGroupedFields(this.keys);
- return Collections.singletonList(props);
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
- gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
- {
- gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
- }
- gProps.clearUniqueFieldCombinations();
- return gProps;
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- return lProps.clearUniqueFieldSets();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
deleted file mode 100644
index 5bb51f3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.ReduceNode;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-
-public final class ReduceProperties extends OperatorDescriptorSingle {
-
- private final Partitioner<?> customPartitioner;
-
- public ReduceProperties(FieldSet keys) {
- this(keys, null);
- }
-
- public ReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) {
- super(keys);
- this.customPartitioner = customPartitioner;
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.SORTED_REDUCE;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
- (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
- {
- return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
- DriverStrategy.SORTED_REDUCE, this.keyList);
- }
- else {
- // non forward case. all local properties are killed anyways, so we can safely plug in a combiner
- Channel toCombiner = new Channel(in.getSource());
- toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
-
- // create an input node for combine with same DOP as input node
- ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
- combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
-
- SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
- "Combine ("+node.getOperator().getName()+")", toCombiner,
- DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
-
- combiner.setCosts(new Costs(0, 0));
- combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
-
- Channel toReducer = new Channel(combiner);
- toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
- in.getShipStrategySortOrder(), in.getDataExchangeMode());
- toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
-
- return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer,
- DriverStrategy.SORTED_REDUCE, this.keyList);
- }
- }
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- RequestedGlobalProperties props = new RequestedGlobalProperties();
- if (customPartitioner == null) {
- props.setAnyPartitioning(this.keys);
- } else {
- props.setCustomPartitioned(this.keys, this.customPartitioner);
- }
- return Collections.singletonList(props);
- }
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- RequestedLocalProperties props = new RequestedLocalProperties();
- props.setGroupedFields(this.keys);
- return Collections.singletonList(props);
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
- gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
- {
- gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
- }
- gProps.clearUniqueFieldCombinations();
- return gProps;
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- return lProps.clearUniqueFieldSets();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
deleted file mode 100644
index 1dcd87d..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.dag.SingleInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class SolutionSetDeltaOperator extends OperatorDescriptorSingle {
-
- public SolutionSetDeltaOperator(FieldList partitioningFields) {
- super(partitioningFields);
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.UNARY_NO_OP;
- }
-
- @Override
- public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
- return new SingleInputPlanNode(node, "SolutionSet Delta", in, DriverStrategy.UNARY_NO_OP);
- }
-
- @Override
- protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
- RequestedGlobalProperties partProps = new RequestedGlobalProperties();
- partProps.setHashPartitioned(this.keyList);
- return Collections.singletonList(partProps);
- }
-
- @Override
- protected List<RequestedLocalProperties> createPossibleLocalProperties() {
- return Collections.singletonList(new RequestedLocalProperties());
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
- return gProps;
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties lProps) {
- return lProps;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
deleted file mode 100644
index 356836a..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
-
- public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
- super(keys1, keys2);
- }
-
- public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
- boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
- {
- super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
- }
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.MERGE;
- }
-
- @Override
- protected List<LocalPropertiesPair> createPossibleLocalProperties() {
- RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1));
- RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
- return Collections.singletonList(new LocalPropertiesPair(sort1, sort2));
- }
-
- @Override
- public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
- LocalProperties produced1, LocalProperties produced2)
- {
- int numRelevantFields = this.keys1.size();
-
- Ordering prod1 = produced1.getOrdering();
- Ordering prod2 = produced2.getOrdering();
-
- if (prod1 == null || prod2 == null) {
- throw new CompilerException("The given properties do not meet this operators requirements.");
- }
-
- // check that order of fields is equivalent
- if (!checkEquivalentFieldPositionsInKeyFields(
- prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
- return false;
- }
-
- // check that both inputs have the same directions of order
- for (int i = 0; i < numRelevantFields; i++) {
- if (prod1.getOrder(i) != prod2.getOrder(i)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
- boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
-
- if (inputOrders == null || inputOrders.length < this.keys1.size()) {
- throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator.");
- } else if (inputOrders.length > this.keys1.size()) {
- boolean[] tmp = new boolean[this.keys1.size()];
- System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
- inputOrders = tmp;
- }
-
- return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders);
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
- LocalProperties comb = LocalProperties.combine(in1, in2);
- return comb.clearUniqueFieldSets();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
deleted file mode 100644
index c42cff2..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.operators;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.SinkJoiner;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- *
- */
-public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual {
-
- @Override
- public DriverStrategy getStrategy() {
- return DriverStrategy.BINARY_NO_OP;
- }
-
- @Override
- protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
- // all properties are possible
- return Collections.singletonList(new GlobalPropertiesPair(
- new RequestedGlobalProperties(), new RequestedGlobalProperties()));
- }
-
- @Override
- protected List<LocalPropertiesPair> createPossibleLocalProperties() {
- // all properties are possible
- return Collections.singletonList(new LocalPropertiesPair(
- new RequestedLocalProperties(), new RequestedLocalProperties()));
- }
-
- @Override
- public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
- GlobalProperties produced1, GlobalProperties produced2) {
- return true;
- }
-
- @Override
- public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
- LocalProperties produced1, LocalProperties produced2) {
- return true;
- }
-
- @Override
- public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
- if (node instanceof SinkJoiner) {
- return new SinkJoinerPlanNode((SinkJoiner) node, in1, in2);
- } else {
- throw new CompilerException();
- }
- }
-
- @Override
- public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
- return new LocalProperties();
- }
-
- @Override
- public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
- return GlobalProperties.combine(in1, in2);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
deleted file mode 100644
index bf22fb3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-
-/**
- * A special subclass for the union to make it identifiable.
- */
-public class BinaryUnionPlanNode extends DualInputPlanNode {
-
- /**
- * @param template
- */
- public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, Channel in2) {
- super(template, "Union", in1, in2, DriverStrategy.UNION);
- }
-
- public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) {
- super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", toSwapFrom.getInput2(), toSwapFrom.getInput1(),
- DriverStrategy.UNION_WITH_CACHED);
-
- this.globalProps = toSwapFrom.globalProps;
- this.localProps = toSwapFrom.localProps;
- this.nodeCosts = toSwapFrom.nodeCosts;
- this.cumulativeCosts = toSwapFrom.cumulativeCosts;
-
- setParallelism(toSwapFrom.getParallelism());
- }
-
- public BinaryUnionNode getOptimizerNode() {
- return (BinaryUnionNode) this.template;
- }
-
- public boolean unionsStaticAndDynamicPath() {
- return getInput1().isOnDynamicPath() != getInput2().isOnDynamicPath();
- }
-
- @Override
- public int getMemoryConsumerWeight() {
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
deleted file mode 100644
index e79e2f3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-
-import java.util.HashMap;
-
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.BulkIterationNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-public class BulkIterationPlanNode extends SingleInputPlanNode implements IterationPlanNode {
-
- private final BulkPartialSolutionPlanNode partialSolutionPlanNode;
-
- private final PlanNode rootOfStepFunction;
-
- private PlanNode rootOfTerminationCriterion;
-
- private TypeSerializerFactory<?> serializerForIterationChannel;
-
- // --------------------------------------------------------------------------------------------
-
- public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input,
- BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction)
- {
- super(template, nodeName, input, DriverStrategy.NONE);
- this.partialSolutionPlanNode = pspn;
- this.rootOfStepFunction = rootOfStepFunction;
-
- mergeBranchPlanMaps();
- }
-
- public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input,
- BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction, PlanNode rootOfTerminationCriterion)
- {
- this(template, nodeName, input, pspn, rootOfStepFunction);
- this.rootOfTerminationCriterion = rootOfTerminationCriterion;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public BulkIterationNode getIterationNode() {
- if (this.template instanceof BulkIterationNode) {
- return (BulkIterationNode) this.template;
- } else {
- throw new RuntimeException();
- }
- }
-
- public BulkPartialSolutionPlanNode getPartialSolutionPlanNode() {
- return this.partialSolutionPlanNode;
- }
-
- public PlanNode getRootOfStepFunction() {
- return this.rootOfStepFunction;
- }
-
- public PlanNode getRootOfTerminationCriterion() {
- return this.rootOfTerminationCriterion;
- }
-
- // --------------------------------------------------------------------------------------------
-
-
- public TypeSerializerFactory<?> getSerializerForIterationChannel() {
- return serializerForIterationChannel;
- }
-
- public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) {
- this.serializerForIterationChannel = serializerForIterationChannel;
- }
-
- public void setCosts(Costs nodeCosts) {
- // add the costs from the step function
- nodeCosts.addCosts(this.rootOfStepFunction.getCumulativeCosts());
-
- // add the costs for the termination criterion, if it exists
- // the costs are divided at branches, so we can simply add them up
- if (rootOfTerminationCriterion != null) {
- nodeCosts.addCosts(this.rootOfTerminationCriterion.getCumulativeCosts());
- }
-
- super.setCosts(nodeCosts);
- }
-
- public int getMemoryConsumerWeight() {
- return 1;
- }
-
-
- @Override
- public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
- if (source == this) {
- return FOUND_SOURCE;
- }
-
- SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source);
-
- if (fromOutside == FOUND_SOURCE_AND_DAM) {
- return FOUND_SOURCE_AND_DAM;
- }
- else if (fromOutside == FOUND_SOURCE) {
- // we always have a dam in the back channel
- return FOUND_SOURCE_AND_DAM;
- } else {
- // check the step function for dams
- return this.rootOfStepFunction.hasDamOnPathDownTo(source);
- }
- }
-
- @Override
- public void acceptForStepFunction(Visitor<PlanNode> visitor) {
- this.rootOfStepFunction.accept(visitor);
-
- if(this.rootOfTerminationCriterion != null) {
- this.rootOfTerminationCriterion.accept(visitor);
- }
- }
-
- private void mergeBranchPlanMaps() {
- for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) {
- OptimizerNode brancher = desc.getBranchingNode();
-
- if (branchPlan == null) {
- branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
- }
-
- if (!branchPlan.containsKey(brancher)) {
- PlanNode selectedCandidate = null;
-
- if (rootOfStepFunction.branchPlan != null) {
- selectedCandidate = rootOfStepFunction.branchPlan.get(brancher);
- }
-
- if (selectedCandidate == null) {
- throw new CompilerException(
- "Candidates for a node with open branches are missing information about the selected candidate ");
- }
-
- this.branchPlan.put(brancher, selectedCandidate);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
deleted file mode 100644
index df05b64..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/BulkPartialSolutionPlanNode.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.Collections;
-import java.util.HashMap;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.Visitor;
-
-/**
- * Plan candidate node for partial solution of a bulk iteration.
- */
-public class BulkPartialSolutionPlanNode extends PlanNode {
-
- private static final Costs NO_COSTS = new Costs();
-
- private BulkIterationPlanNode containingIterationNode;
-
- private Channel initialInput;
-
- public Object postPassHelper;
-
-
- public BulkPartialSolutionPlanNode(BulkPartialSolutionNode template, String nodeName,
- GlobalProperties gProps, LocalProperties lProps,
- Channel initialInput)
- {
- super(template, nodeName, DriverStrategy.NONE);
-
- this.globalProps = gProps;
- this.localProps = lProps;
- this.initialInput = initialInput;
-
- // the partial solution does not cost anything
- this.nodeCosts = NO_COSTS;
- this.cumulativeCosts = NO_COSTS;
-
- if (initialInput.getSource().branchPlan != null && initialInput.getSource().branchPlan.size() > 0) {
- if (this.branchPlan == null) {
- this.branchPlan = new HashMap<OptimizerNode, PlanNode>();
- }
-
- this.branchPlan.putAll(initialInput.getSource().branchPlan);
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- public BulkPartialSolutionNode getPartialSolutionNode() {
- return (BulkPartialSolutionNode) this.template;
- }
-
- public BulkIterationPlanNode getContainingIterationNode() {
- return this.containingIterationNode;
- }
-
- public void setContainingIterationNode(BulkIterationPlanNode containingIterationNode) {
- this.containingIterationNode = containingIterationNode;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void accept(Visitor<PlanNode> visitor) {
- if (visitor.preVisit(this)) {
- visitor.postVisit(this);
- }
- }
-
- @Override
- public Iterable<PlanNode> getPredecessors() {
- return Collections.<PlanNode>emptyList();
- }
-
- @Override
- public Iterable<Channel> getInputs() {
- return Collections.<Channel>emptyList();
- }
-
- @Override
- public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
- if (source == this) {
- return FOUND_SOURCE;
- }
- SourceAndDamReport res = this.initialInput.getSource().hasDamOnPathDownTo(source);
- if (res == FOUND_SOURCE_AND_DAM) {
- return FOUND_SOURCE_AND_DAM;
- }
- else if (res == FOUND_SOURCE) {
- return (this.initialInput.getLocalStrategy().dams() ||
- this.initialInput.getTempMode().breaksPipeline() ||
- getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) ?
- FOUND_SOURCE_AND_DAM : FOUND_SOURCE;
- }
- else {
- return NOT_FOUND;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
deleted file mode 100644
index 875d1c3..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/Channel.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.EstimateProvider;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
-import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
-import org.apache.flink.optimizer.plandump.DumpableConnection;
-import org.apache.flink.optimizer.util.Utils;
-import org.apache.flink.runtime.io.network.DataExchangeMode;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-
-/**
- * A Channel represents the result produced by an operator and the data exchange
- * before the consumption by the target operator.
- *
- * The channel defines and tracks various properties and characteristics of the
- * data set and data exchange.
- *
- * Data set characteristics:
- * <ul>
- * <li>The "global properties" of the data, i.e., how the data is distributed across
- * partitions</li>
- * <li>The "required global properties" of the data, i.e., the global properties that, if absent,
- * would cause the program to return a wrong result.</li>
- * <li>The "local properties" of the data, i.e., how the data is organized within a partition</li>
- * <li>The "required local properties" of the data, i.e., the local properties that, if absent,
- * would cause the program to return a wrong result.</li>
- * </ul>
- *
- * Data exchange parameters:
- * <ul>
- * <li>The "ship strategy", i.e., whether to forward the data, shuffle it, broadcast it, ...</li>
- * <li>The "ship keys", which are the positions of the key fields in the exchanged records.</li>
- * <li>The "data exchange mode", which defines whether to pipeline or batch the exchange</li>
- * <li>Several more...</li>
- * </ul>
- */
-public class Channel implements EstimateProvider, Cloneable, DumpableConnection<PlanNode> {
-
- private PlanNode source;
-
- private PlanNode target;
-
- private ShipStrategyType shipStrategy = ShipStrategyType.NONE;
-
- private DataExchangeMode dataExchangeMode;
-
- private LocalStrategy localStrategy = LocalStrategy.NONE;
-
- private FieldList shipKeys;
-
- private FieldList localKeys;
-
- private boolean[] shipSortOrder;
-
- private boolean[] localSortOrder;
-
- private RequestedGlobalProperties requiredGlobalProps;
-
- private RequestedLocalProperties requiredLocalProps;
-
- private GlobalProperties globalProps;
-
- private LocalProperties localProps;
-
- private TypeSerializerFactory<?> serializer;
-
- private TypeComparatorFactory<?> shipStrategyComparator;
-
- private TypeComparatorFactory<?> localStrategyComparator;
-
- private DataDistribution dataDistribution;
-
- private Partitioner<?> partitioner;
-
- private TempMode tempMode;
-
- private double relativeTempMemory;
-
- private double relativeMemoryLocalStrategy;
-
- private int replicationFactor = 1;
-
- // --------------------------------------------------------------------------------------------
-
- public Channel(PlanNode sourceNode) {
- this(sourceNode, null);
- }
-
- public Channel(PlanNode sourceNode, TempMode tempMode) {
- this.source = sourceNode;
- this.tempMode = (tempMode == null ? TempMode.NONE : tempMode);
- }
-
- // --------------------------------------------------------------------------------------------
- // Accessors
- // --------------------------------------------------------------------------------------------
-
- /**
- * Gets the source of this Channel.
- *
- * @return The source.
- */
- @Override
- public PlanNode getSource() {
- return this.source;
- }
-
- /**
- * Sets the target of this Channel.
- *
- * @param target The target.
- */
- public void setTarget(PlanNode target) {
- this.target = target;
- }
-
- /**
- * Gets the target of this Channel.
- *
- * @return The target.
- */
- public PlanNode getTarget() {
- return this.target;
- }
-
- public void setShipStrategy(ShipStrategyType strategy, DataExchangeMode dataExchangeMode) {
- setShipStrategy(strategy, null, null, null, dataExchangeMode);
- }
-
- public void setShipStrategy(ShipStrategyType strategy, FieldList keys, DataExchangeMode dataExchangeMode) {
- setShipStrategy(strategy, keys, null, null, dataExchangeMode);
- }
-
- public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
- boolean[] sortDirection, DataExchangeMode dataExchangeMode) {
- setShipStrategy(strategy, keys, sortDirection, null, dataExchangeMode);
- }
-
- public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
- Partitioner<?> partitioner, DataExchangeMode dataExchangeMode) {
- setShipStrategy(strategy, keys, null, partitioner, dataExchangeMode);
- }
-
- public void setShipStrategy(ShipStrategyType strategy, FieldList keys,
- boolean[] sortDirection, Partitioner<?> partitioner,
- DataExchangeMode dataExchangeMode) {
- this.shipStrategy = strategy;
- this.shipKeys = keys;
- this.shipSortOrder = sortDirection;
- this.partitioner = partitioner;
- this.dataExchangeMode = dataExchangeMode;
- this.globalProps = null; // reset the global properties
- }
-
- /**
- * Gets the data exchange mode (batch / streaming) to use for the data
- * exchange of this channel.
- *
- * @return The data exchange mode of this channel.
- */
- public DataExchangeMode getDataExchangeMode() {
- return dataExchangeMode;
- }
-
- public ShipStrategyType getShipStrategy() {
- return this.shipStrategy;
- }
-
- public FieldList getShipStrategyKeys() {
- return this.shipKeys;
- }
-
- public boolean[] getShipStrategySortOrder() {
- return this.shipSortOrder;
- }
-
- public void setLocalStrategy(LocalStrategy strategy) {
- setLocalStrategy(strategy, null, null);
- }
-
- public void setLocalStrategy(LocalStrategy strategy, FieldList keys, boolean[] sortDirection) {
- this.localStrategy = strategy;
- this.localKeys = keys;
- this.localSortOrder = sortDirection;
- this.localProps = null; // reset the local properties
- }
-
- public LocalStrategy getLocalStrategy() {
- return this.localStrategy;
- }
-
- public FieldList getLocalStrategyKeys() {
- return this.localKeys;
- }
-
- public boolean[] getLocalStrategySortOrder() {
- return this.localSortOrder;
- }
-
- public void setDataDistribution(DataDistribution dataDistribution) {
- this.dataDistribution = dataDistribution;
- }
-
- public DataDistribution getDataDistribution() {
- return this.dataDistribution;
- }
-
- public Partitioner<?> getPartitioner() {
- return partitioner;
- }
-
- public TempMode getTempMode() {
- return this.tempMode;
- }
-
- /**
- * Sets the temp mode of the connection.
- *
- * @param tempMode
- * The temp mode of the connection.
- */
- public void setTempMode(TempMode tempMode) {
- this.tempMode = tempMode;
- }
-
- /**
- * Gets the memory for materializing the channel's result from this Channel.
- *
- * @return The temp memory.
- */
- public double getRelativeTempMemory() {
- return this.relativeTempMemory;
- }
-
- /**
- * Sets the memory for materializing the channel's result from this Channel.
- *
- * @param relativeTempMemory The memory for materialization.
- */
- public void setRelativeTempMemory(double relativeTempMemory) {
- this.relativeTempMemory = relativeTempMemory;
- }
-
- /**
- * Sets the replication factor of the connection.
- *
- * @param factor The replication factor of the connection.
- */
- public void setReplicationFactor(int factor) {
- this.replicationFactor = factor;
- }
-
- /**
- * Returns the replication factor of the connection.
- *
- * @return The replication factor of the connection.
- */
- public int getReplicationFactor() {
- return this.replicationFactor;
- }
-
- /**
- * Gets the serializer from this Channel.
- *
- * @return The serializer.
- */
- public TypeSerializerFactory<?> getSerializer() {
- return serializer;
- }
-
- /**
- * Sets the serializer for this Channel.
- *
- * @param serializer The serializer to set.
- */
- public void setSerializer(TypeSerializerFactory<?> serializer) {
- this.serializer = serializer;
- }
-
- /**
- * Gets the ship strategy comparator from this Channel.
- *
- * @return The ship strategy comparator.
- */
- public TypeComparatorFactory<?> getShipStrategyComparator() {
- return shipStrategyComparator;
- }
-
- /**
- * Sets the ship strategy comparator for this Channel.
- *
- * @param shipStrategyComparator The ship strategy comparator to set.
- */
- public void setShipStrategyComparator(TypeComparatorFactory<?> shipStrategyComparator) {
- this.shipStrategyComparator = shipStrategyComparator;
- }
-
- /**
- * Gets the local strategy comparator from this Channel.
- *
- * @return The local strategy comparator.
- */
- public TypeComparatorFactory<?> getLocalStrategyComparator() {
- return localStrategyComparator;
- }
-
- /**
- * Sets the local strategy comparator for this Channel.
- *
- * @param localStrategyComparator The local strategy comparator to set.
- */
- public void setLocalStrategyComparator(TypeComparatorFactory<?> localStrategyComparator) {
- this.localStrategyComparator = localStrategyComparator;
- }
-
- public double getRelativeMemoryLocalStrategy() {
- return relativeMemoryLocalStrategy;
- }
-
- public void setRelativeMemoryLocalStrategy(double relativeMemoryLocalStrategy) {
- this.relativeMemoryLocalStrategy = relativeMemoryLocalStrategy;
- }
-
- public boolean isOnDynamicPath() {
- return this.source.isOnDynamicPath();
- }
-
- public int getCostWeight() {
- return this.source.getCostWeight();
- }
-
- // --------------------------------------------------------------------------------------------
- // Statistic Estimates
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public long getEstimatedOutputSize() {
- long estimate = this.source.template.getEstimatedOutputSize();
- return estimate < 0 ? estimate : estimate * this.replicationFactor;
- }
-
- @Override
- public long getEstimatedNumRecords() {
- long estimate = this.source.template.getEstimatedNumRecords();
- return estimate < 0 ? estimate : estimate * this.replicationFactor;
- }
-
- @Override
- public float getEstimatedAvgWidthPerOutputRecord() {
- return this.source.template.getEstimatedAvgWidthPerOutputRecord();
- }
-
- // --------------------------------------------------------------------------------------------
- // Data Property Handling
- // --------------------------------------------------------------------------------------------
-
-
- public RequestedGlobalProperties getRequiredGlobalProps() {
- return requiredGlobalProps;
- }
-
- public void setRequiredGlobalProps(RequestedGlobalProperties requiredGlobalProps) {
- this.requiredGlobalProps = requiredGlobalProps;
- }
-
- public RequestedLocalProperties getRequiredLocalProps() {
- return requiredLocalProps;
- }
-
- public void setRequiredLocalProps(RequestedLocalProperties requiredLocalProps) {
- this.requiredLocalProps = requiredLocalProps;
- }
-
- public GlobalProperties getGlobalProperties() {
- if (this.globalProps == null) {
- this.globalProps = this.source.getGlobalProperties().clone();
- switch (this.shipStrategy) {
- case BROADCAST:
- this.globalProps.clearUniqueFieldCombinations();
- this.globalProps.setFullyReplicated();
- break;
- case PARTITION_HASH:
- this.globalProps.setHashPartitioned(this.shipKeys);
- break;
- case PARTITION_RANGE:
- this.globalProps.setRangePartitioned(Utils.createOrdering(this.shipKeys, this.shipSortOrder));
- break;
- case FORWARD:
- break;
- case PARTITION_RANDOM:
- this.globalProps.reset();
- break;
- case PARTITION_FORCED_REBALANCE:
- this.globalProps.setForcedRebalanced();
- break;
- case PARTITION_CUSTOM:
- this.globalProps.setCustomPartitioned(this.shipKeys, this.partitioner);
- break;
- case NONE:
- throw new CompilerException("Cannot produce GlobalProperties before ship strategy is set.");
- }
- }
-
- return this.globalProps;
- }
-
- public LocalProperties getLocalProperties() {
- if (this.localProps == null) {
- computeLocalPropertiesAfterShippingOnly();
- switch (this.localStrategy) {
- case NONE:
- break;
- case SORT:
- case COMBININGSORT:
- this.localProps = LocalProperties.forOrdering(Utils.createOrdering(this.localKeys, this.localSortOrder));
- break;
- default:
- throw new CompilerException("Unsupported local strategy for channel.");
- }
- }
-
- return this.localProps;
- }
-
- private void computeLocalPropertiesAfterShippingOnly() {
- switch (this.shipStrategy) {
- case BROADCAST:
- case PARTITION_HASH:
- case PARTITION_CUSTOM:
- case PARTITION_RANGE:
- case PARTITION_RANDOM:
- case PARTITION_FORCED_REBALANCE:
- this.localProps = new LocalProperties();
- break;
- case FORWARD:
- this.localProps = this.source.getLocalProperties();
- break;
- case NONE:
- throw new CompilerException("ShipStrategy has not yet been set.");
- default:
- throw new CompilerException("Unknown ShipStrategy.");
- }
- }
-
- public void adjustGlobalPropertiesForFullParallelismChange() {
- if (this.shipStrategy == null || this.shipStrategy == ShipStrategyType.NONE) {
- throw new IllegalStateException("Cannot adjust channel for degree of parallelism " +
- "change before the ship strategy is set.");
- }
-
- // make sure the properties are acquired
- if (this.globalProps == null) {
- getGlobalProperties();
- }
-
- // some strategies globally reestablish properties
- switch (this.shipStrategy) {
- case FORWARD:
- throw new CompilerException("Cannot use FORWARD strategy between operations " +
- "with different number of parallel instances.");
- case NONE: // excluded by sanity check. left here for verification check completion
- case BROADCAST:
- case PARTITION_HASH:
- case PARTITION_RANGE:
- case PARTITION_RANDOM:
- case PARTITION_FORCED_REBALANCE:
- case PARTITION_CUSTOM:
- return;
- }
- throw new CompilerException("Unrecognized Ship Strategy Type: " + this.shipStrategy);
- }
-
- // --------------------------------------------------------------------------------------------
-
- /**
- * Utility method used while swapping binary union nodes for n-ary union nodes.
- */
- public void swapUnionNodes(PlanNode newUnionNode) {
- if (!(this.source instanceof BinaryUnionPlanNode)) {
- throw new IllegalStateException();
- } else {
- this.source = newUnionNode;
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- public int getMaxDepth() {
- return this.source.getOptimizerNode().getMaxDepth() + 1;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return "Channel (" + this.source + (this.target == null ? ')' : ") -> (" + this.target + ')') +
- '[' + this.shipStrategy + "] [" + this.localStrategy + "] " +
- (this.tempMode == null || this.tempMode == TempMode.NONE ? "{NO-TEMP}" : this.tempMode);
- }
-
- @Override
- public Channel clone() {
- try {
- return (Channel) super.clone();
- } catch (CloneNotSupportedException cnsex) {
- throw new RuntimeException(cnsex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
deleted file mode 100644
index 01c56dd..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/DualInputPlanNode.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
-import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.NOT_FOUND;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.operators.util.FieldList;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.optimizer.dag.OptimizerNode;
-import org.apache.flink.optimizer.dag.TwoInputNode;
-import org.apache.flink.runtime.operators.DamBehavior;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.util.Visitor;
-
-/**
- *
- */
-public class DualInputPlanNode extends PlanNode {
-
- protected final Channel input1;
- protected final Channel input2;
-
- protected final FieldList keys1;
- protected final FieldList keys2;
-
- protected final boolean[] sortOrders;
-
- private TypeComparatorFactory<?> comparator1;
- private TypeComparatorFactory<?> comparator2;
- private TypePairComparatorFactory<?, ?> pairComparator;
-
- public Object postPassHelper1;
- public Object postPassHelper2;
-
- // --------------------------------------------------------------------------------------------
-
- public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy) {
- this(template, nodeName, input1, input2, diverStrategy, null, null, null);
- }
-
- public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2,
- DriverStrategy diverStrategy, FieldList driverKeyFields1, FieldList driverKeyFields2)
- {
- this(template, nodeName, input1, input2, diverStrategy, driverKeyFields1, driverKeyFields2,
- SingleInputPlanNode.getTrueArray(driverKeyFields1.size()));
- }
-
- public DualInputPlanNode(OptimizerNode template, String nodeName, Channel input1, Channel input2, DriverStrategy diverStrategy,
- FieldList driverKeyFields1, FieldList driverKeyFields2, boolean[] driverSortOrders)
- {
- super(template, nodeName, diverStrategy);
- this.input1 = input1;
- this.input2 = input2;
- this.keys1 = driverKeyFields1;
- this.keys2 = driverKeyFields2;
- this.sortOrders = driverSortOrders;
-
- if (this.input1.getShipStrategy() == ShipStrategyType.BROADCAST) {
- this.input1.setReplicationFactor(getParallelism());
- }
- if (this.input2.getShipStrategy() == ShipStrategyType.BROADCAST) {
- this.input2.setReplicationFactor(getParallelism());
- }
-
- mergeBranchPlanMaps(input1.getSource(), input2.getSource());
- }
-
- // --------------------------------------------------------------------------------------------
-
- public TwoInputNode getTwoInputNode() {
- if (this.template instanceof TwoInputNode) {
- return (TwoInputNode) this.template;
- } else {
- throw new RuntimeException();
- }
- }
-
- public FieldList getKeysForInput1() {
- return this.keys1;
- }
-
- public FieldList getKeysForInput2() {
- return this.keys2;
- }
-
- public boolean[] getSortOrders() {
- return this.sortOrders;
- }
-
- public TypeComparatorFactory<?> getComparator1() {
- return this.comparator1;
- }
-
- public TypeComparatorFactory<?> getComparator2() {
- return this.comparator2;
- }
-
- public void setComparator1(TypeComparatorFactory<?> comparator) {
- this.comparator1 = comparator;
- }
-
- public void setComparator2(TypeComparatorFactory<?> comparator) {
- this.comparator2 = comparator;
- }
-
- public TypePairComparatorFactory<?, ?> getPairComparator() {
- return this.pairComparator;
- }
-
- public void setPairComparator(TypePairComparatorFactory<?, ?> comparator) {
- this.pairComparator = comparator;
- }
-
- /**
- * Gets the first input channel to this node.
- *
- * @return The first input channel to this node.
- */
- public Channel getInput1() {
- return this.input1;
- }
-
- /**
- * Gets the second input channel to this node.
- *
- * @return The second input channel to this node.
- */
- public Channel getInput2() {
- return this.input2;
- }
-
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public void accept(Visitor<PlanNode> visitor) {
- if (visitor.preVisit(this)) {
- this.input1.getSource().accept(visitor);
- this.input2.getSource().accept(visitor);
-
- for (Channel broadcastInput : getBroadcastInputs()) {
- broadcastInput.getSource().accept(visitor);
- }
-
- visitor.postVisit(this);
- }
- }
-
-
- @Override
- public Iterable<PlanNode> getPredecessors() {
- if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
- return Arrays.asList(this.input1.getSource(), this.input2.getSource());
- } else {
- List<PlanNode> preds = new ArrayList<PlanNode>();
-
- preds.add(input1.getSource());
- preds.add(input2.getSource());
-
- for (Channel c : getBroadcastInputs()) {
- preds.add(c.getSource());
- }
-
- return preds;
- }
- }
-
- @Override
- public Iterable<Channel> getInputs() {
- return Arrays.asList(this.input1, this.input2);
- }
-
-
- @Override
- public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
- if (source == this) {
- return FOUND_SOURCE;
- }
-
- // check first input
- SourceAndDamReport res1 = this.input1.getSource().hasDamOnPathDownTo(source);
- if (res1 == FOUND_SOURCE_AND_DAM) {
- return FOUND_SOURCE_AND_DAM;
- }
- else if (res1 == FOUND_SOURCE) {
- if (this.input1.getLocalStrategy().dams() || this.input1.getTempMode().breaksPipeline() ||
- getDriverStrategy().firstDam() == DamBehavior.FULL_DAM) {
- return FOUND_SOURCE_AND_DAM;
- } else {
- return FOUND_SOURCE;
- }
- }
- else {
- SourceAndDamReport res2 = this.input2.getSource().hasDamOnPathDownTo(source);
- if (res2 == FOUND_SOURCE_AND_DAM) {
- return FOUND_SOURCE_AND_DAM;
- }
- else if (res2 == FOUND_SOURCE) {
- if (this.input2.getLocalStrategy().dams() || this.input2.getTempMode().breaksPipeline() ||
- getDriverStrategy().secondDam() == DamBehavior.FULL_DAM) {
- return FOUND_SOURCE_AND_DAM;
- } else {
- return FOUND_SOURCE;
- }
- }
- else {
- // NOT_FOUND
- // check the broadcast inputs
-
- for (NamedChannel nc : getBroadcastInputs()) {
- SourceAndDamReport bcRes = nc.getSource().hasDamOnPathDownTo(source);
- if (bcRes != NOT_FOUND) {
- // broadcast inputs are always dams
- return FOUND_SOURCE_AND_DAM;
- }
- }
- return NOT_FOUND;
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
deleted file mode 100644
index d146c83..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/FlinkPlan.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-/**
- * A common interface for compiled Flink plans for both batch and streaming
- * processing programs.
- *
- */
-public interface FlinkPlan {
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
deleted file mode 100644
index 38f76b2..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/IterationPlanNode.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.IterationNode;
-import org.apache.flink.util.Visitor;
-
-/**
- *
- */
-public interface IterationPlanNode {
-
- void acceptForStepFunction(Visitor<PlanNode> visitor);
-
- IterationNode getIterationNode();
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
deleted file mode 100644
index 3650eea..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NAryUnionPlanNode.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.optimizer.costs.Costs;
-import org.apache.flink.optimizer.dag.BinaryUnionNode;
-import org.apache.flink.optimizer.dataproperties.GlobalProperties;
-import org.apache.flink.optimizer.dataproperties.LocalProperties;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.util.IterableIterator;
-import org.apache.flink.util.Visitor;
-
-/**
- * A union operation over multiple inputs (2 or more).
- */
-public class NAryUnionPlanNode extends PlanNode {
-
- private final List<Channel> inputs;
-
- /**
- * @param template
- */
- public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps,
- Costs cumulativeCosts)
- {
- super(template, "Union", DriverStrategy.NONE);
-
- this.inputs = inputs;
- this.globalProps = gProps;
- this.localProps = new LocalProperties();
- this.nodeCosts = new Costs();
- this.cumulativeCosts = cumulativeCosts;
- }
-
- @Override
- public void accept(Visitor<PlanNode> visitor) {
- visitor.preVisit(this);
- for (Channel c : this.inputs) {
- c.getSource().accept(visitor);
- }
- visitor.postVisit(this);
- }
-
- public List<Channel> getListOfInputs() {
- return this.inputs;
- }
-
- @Override
- public Iterable<Channel> getInputs() {
- return Collections.unmodifiableList(this.inputs);
- }
-
- @Override
- public Iterable<PlanNode> getPredecessors() {
- final Iterator<Channel> channels = this.inputs.iterator();
- return new IterableIterator<PlanNode>() {
-
- @Override
- public boolean hasNext() {
- return channels.hasNext();
- }
-
- @Override
- public PlanNode next() {
- return channels.next().getSource();
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Iterator<PlanNode> iterator() {
- return this;
- }
- };
- }
-
- @Override
- public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
- // this node is used after the plan enumeration. consequently, this will never be invoked here
- throw new UnsupportedOperationException();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
deleted file mode 100644
index da97e61..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/NamedChannel.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.optimizer.plan;
-
-import org.apache.flink.optimizer.dag.TempMode;
-
-public class NamedChannel extends Channel {
-
- private final String name;
-
- /**
- * Initializes NamedChannel.
- *
- * @param sourceNode
- */
- public NamedChannel(String name, PlanNode sourceNode) {
- super(sourceNode);
- this.name = name;
- }
-
- public NamedChannel(String name, PlanNode sourceNode, TempMode tempMode) {
- super(sourceNode, tempMode);
- this.name = name;
- }
-
- public String getName() {
- return this.name;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java b/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
deleted file mode 100644
index d56be87..0000000
--- a/flink-compiler/src/main/java/org/apache/flink/optimizer/plan/OptimizedPlan.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.optimizer.plan;
-
-import java.util.Collection;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.util.Visitable;
-import org.apache.flink.util.Visitor;
-
-/**
- * The execution plan generated by the Optimizer. It contains {@link PlanNode}s
- * and {@link Channel}s that describe exactly how the program should be executed.
- * It defines all ship strategies (local pipe, shuffle, broadcast, rebalance), all
- * operator strategies (sorting-merge join, hash join, sorted grouping, ...),
- * and the data exchange modes (batched, pipelined).
- */
-public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> {
-
- /** The data sources in the plan. */
- private final Collection<SourcePlanNode> dataSources;
-
- /** The data sinks in the plan. */
- private final Collection<SinkPlanNode> dataSinks;
-
- /** All nodes in the optimizer plan. */
- private final Collection<PlanNode> allNodes;
-
- /** The original program. */
- private final Plan originalProgram;
-
- /** Name of the job */
- private final String jobName;
-
- /**
- * Creates a new instance of this optimizer plan container. The plan is given and fully
- * described by the data sources, sinks and the collection of all nodes.
- *
- * @param sources The data sources.
- * @param sinks The data sinks.
- * @param allNodes A collection containing all nodes in the plan.
- * @param jobName The name of the program
- */
- public OptimizedPlan(Collection<SourcePlanNode> sources, Collection<SinkPlanNode> sinks,
- Collection<PlanNode> allNodes, String jobName, Plan programPlan)
- {
- this.dataSources = sources;
- this.dataSinks = sinks;
- this.allNodes = allNodes;
- this.jobName = jobName;
- this.originalProgram = programPlan;
- }
-
- /**
- * Gets the data sources from this OptimizedPlan.
- *
- * @return The data sources.
- */
- public Collection<SourcePlanNode> getDataSources() {
- return dataSources;
- }
-
- /**
- * Gets the data sinks from this OptimizedPlan.
- *
- * @return The data sinks.
- */
- public Collection<SinkPlanNode> getDataSinks() {
- return dataSinks;
- }
-
- /**
- * Gets all the nodes from this OptimizedPlan.
- *
- * @return All nodes.
- */
- public Collection<PlanNode> getAllNodes() {
- return allNodes;
- }
-
- /**
- * Returns the name of the program.
- *
- * @return The name of the program.
- */
- public String getJobName() {
- return this.jobName;
- }
-
- /**
- * Gets the original program plan from which this optimized plan was created.
- *
- * @return The original program plan.
- */
- public Plan getOriginalPactPlan() {
- return this.originalProgram;
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Applies the given visitor top down to all nodes, starting at the sinks.
- *
- * @param visitor
- * The visitor to apply to the nodes in this plan.
- * @see org.apache.flink.util.Visitable#accept(org.apache.flink.util.Visitor)
- */
- @Override
- public void accept(Visitor<PlanNode> visitor) {
- for (SinkPlanNode node : this.dataSinks) {
- node.accept(visitor);
- }
- }
-}