You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:53 UTC
[14/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
new file mode 100644
index 0000000..3fabad6
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/CrossStreamOuterSecondDescriptor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class CrossStreamOuterSecondDescriptor extends CartesianProductDescriptor {
+
+ public CrossStreamOuterSecondDescriptor() {
+ this(true, true);
+ }
+
+ public CrossStreamOuterSecondDescriptor(boolean allowBroadcastFirst, boolean allowBroadcastSecond) {
+ super(allowBroadcastFirst, allowBroadcastSecond);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ // uniqueness becomes grouping with streamed nested loops
+ if ((in2.getGroupedFields() == null || in2.getGroupedFields().size() == 0) &&
+ in2.getUniqueFields() != null && in2.getUniqueFields().size() > 0)
+ {
+ return LocalProperties.forGrouping(in2.getUniqueFields().iterator().next().toFieldList());
+ } else {
+ return in2.clearUniqueFieldSets();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
new file mode 100644
index 0000000..81c823f
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FilterDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class FilterDescriptor extends OperatorDescriptorSingle {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.FLAT_MAP;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "Filter ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setAnyDistribution();
+ return Collections.singletonList(rgp);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
new file mode 100644
index 0000000..b915e45
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/FlatMapDescriptor.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class FlatMapDescriptor extends OperatorDescriptorSingle {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.FLAT_MAP;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "FlatMap ("+node.getOperator().getName()+")", in, DriverStrategy.FLAT_MAP);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setAnyDistribution();
+ return Collections.singletonList(rgp);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+ gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
+ {
+ gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+ }
+ gProps.clearUniqueFieldCombinations();
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps.clearUniqueFieldSets();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
new file mode 100644
index 0000000..b648386
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupCombineProperties.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The properties file belonging to the GroupCombineNode. It translates the GroupCombine operation
+ * to the driver strategy SORTED_GROUP_COMBINE and sets the relevant grouping and sorting keys.
+ * @see org.apache.flink.optimizer.dag.GroupCombineNode
+ */
+public final class GroupCombineProperties extends OperatorDescriptorSingle {
+
+ private final Ordering ordering; // ordering that we need to use if an additional ordering is requested
+
+ public GroupCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) {
+ super(groupKeys);
+
+ // if we have an additional ordering, construct the ordering to have primarily the grouping fields
+
+ this.ordering = new Ordering();
+ for (Integer key : this.keyList) {
+ this.ordering.appendOrdering(key, null, Order.ANY);
+ }
+
+ // and next the additional order fields
+ if (additionalOrderKeys != null) {
+ for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) {
+ Integer field = additionalOrderKeys.getFieldNumber(i);
+ Order order = additionalOrderKeys.getOrder(i);
+ this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order);
+ }
+ }
+
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.SORTED_GROUP_COMBINE;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ node.setDegreeOfParallelism(in.getSource().getParallelism());
+
+ // sorting key info
+ SingleInputPlanNode singleInputPlanNode = new SingleInputPlanNode(
+ node,
+ "GroupCombine (" + node.getOperator().getName() + ")",
+ in, // reuse the combine strategy also used in the group reduce
+ DriverStrategy.SORTED_GROUP_COMBINE, this.keyList);
+
+ // set sorting comparator key info
+ singleInputPlanNode.setDriverKeyInfo(this.ordering.getInvolvedIndexes(), this.ordering.getFieldSortDirections(), 0);
+ // set grouping comparator key info
+ singleInputPlanNode.setDriverKeyInfo(this.keyList, 1);
+
+ return singleInputPlanNode;
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties props = new RequestedGlobalProperties();
+ props.setRandomPartitioning();
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+ gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED) {
+ gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+ }
+ gProps.clearUniqueFieldCombinations();
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps.clearUniqueFieldSets();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
new file mode 100644
index 0000000..ebd09f2
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceProperties.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public final class GroupReduceProperties extends OperatorDescriptorSingle {
+
+ private final Ordering ordering; // ordering that we need to use if an additional ordering is requested
+
+ private final Partitioner<?> customPartitioner;
+
+
+ public GroupReduceProperties(FieldSet keys) {
+ this(keys, null, null);
+ }
+
+ public GroupReduceProperties(FieldSet keys, Ordering additionalOrderKeys) {
+ this(keys, additionalOrderKeys, null);
+ }
+
+ public GroupReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) {
+ this(keys, null, customPartitioner);
+ }
+
+ public GroupReduceProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) {
+ super(groupKeys);
+
+ // if we have an additional ordering, construct the ordering to have primarily the grouping fields
+ if (additionalOrderKeys != null) {
+ this.ordering = new Ordering();
+ for (Integer key : this.keyList) {
+ this.ordering.appendOrdering(key, null, Order.ANY);
+ }
+
+ // and next the additional order fields
+ for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) {
+ Integer field = additionalOrderKeys.getFieldNumber(i);
+ Order order = additionalOrderKeys.getOrder(i);
+ this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order);
+ }
+ }
+ else {
+ this.ordering = null;
+ }
+
+ this.customPartitioner = customPartitioner;
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.SORTED_GROUP_REDUCE;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "GroupReduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties props = new RequestedGlobalProperties();
+
+ if (customPartitioner == null) {
+ props.setAnyPartitioning(this.keys);
+ } else {
+ props.setCustomPartitioned(this.keys, this.customPartitioner);
+ }
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ RequestedLocalProperties props = new RequestedLocalProperties();
+ if (this.ordering == null) {
+ props.setGroupedFields(this.keys);
+ } else {
+ props.setOrdering(this.ordering);
+ }
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+ gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
+ {
+ gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+ }
+ gProps.clearUniqueFieldCombinations();
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps.clearUniqueFieldSets();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
new file mode 100644
index 0000000..c4f47d3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+public final class GroupReduceWithCombineProperties extends OperatorDescriptorSingle {
+
+ private final Ordering ordering; // ordering that we need to use if an additional ordering is requested
+
+ private final Partitioner<?> customPartitioner;
+
+
+ public GroupReduceWithCombineProperties(FieldSet groupKeys) {
+ this(groupKeys, null, null);
+ }
+
+ public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys) {
+ this(groupKeys, additionalOrderKeys, null);
+ }
+
+ public GroupReduceWithCombineProperties(FieldSet groupKeys, Partitioner<?> customPartitioner) {
+ this(groupKeys, null, customPartitioner);
+ }
+
+ public GroupReduceWithCombineProperties(FieldSet groupKeys, Ordering additionalOrderKeys, Partitioner<?> customPartitioner) {
+ super(groupKeys);
+
+ // if we have an additional ordering, construct the ordering to have primarily the grouping fields
+ if (additionalOrderKeys != null) {
+ this.ordering = new Ordering();
+ for (Integer key : this.keyList) {
+ this.ordering.appendOrdering(key, null, Order.ANY);
+ }
+
+ // and next the additional order fields
+ for (int i = 0; i < additionalOrderKeys.getNumberOfFields(); i++) {
+ Integer field = additionalOrderKeys.getFieldNumber(i);
+ Order order = additionalOrderKeys.getOrder(i);
+ this.ordering.appendOrdering(field, additionalOrderKeys.getType(i), order);
+ }
+ } else {
+ this.ordering = null;
+ }
+
+ this.customPartitioner = customPartitioner;
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.SORTED_GROUP_REDUCE;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ if (in.getShipStrategy() == ShipStrategyType.FORWARD) {
+ // adjust a sort (changes grouping, so it must be for this driver to combining sort
+ if (in.getLocalStrategy() == LocalStrategy.SORT) {
+ if (!in.getLocalStrategyKeys().isValidUnorderedPrefix(this.keys)) {
+ throw new RuntimeException("Bug: Inconsistent sort for group strategy.");
+ }
+ in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
+ in.getLocalStrategySortOrder());
+ }
+ return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in,
+ DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+ } else {
+ // non forward case. all local properties are killed anyways, so we can safely plug in a combiner
+ Channel toCombiner = new Channel(in.getSource());
+ toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
+
+ // create an input node for combine with same DOP as input node
+ GroupReduceNode combinerNode = ((GroupReduceNode) node).getCombinerUtilityNode();
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+
+ SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator()
+ .getName()+")", toCombiner, DriverStrategy.SORTED_GROUP_COMBINE);
+ combiner.setCosts(new Costs(0, 0));
+ combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
+ // set sorting comparator key info
+ combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
+ // set grouping comparator key info
+ combiner.setDriverKeyInfo(this.keyList, 1);
+
+ Channel toReducer = new Channel(combiner);
+ toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+ in.getShipStrategySortOrder(), in.getDataExchangeMode());
+ toReducer.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
+ in.getLocalStrategySortOrder());
+
+ return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")",
+ toReducer, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
+ }
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties props = new RequestedGlobalProperties();
+ if (customPartitioner == null) {
+ props.setAnyPartitioning(this.keys);
+ } else {
+ props.setCustomPartitioned(this.keys, this.customPartitioner);
+ }
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ RequestedLocalProperties props = new RequestedLocalProperties();
+ if (this.ordering == null) {
+ props.setGroupedFields(this.keys);
+ } else {
+ props.setOrdering(this.ordering);
+ }
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+ gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
+ {
+ gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+ }
+ gProps.clearUniqueFieldCombinations();
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps.clearUniqueFieldSets();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
new file mode 100644
index 0000000..fec72a9
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildFirstProperties.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ *
+ */
+public class HashJoinBuildFirstProperties extends AbstractJoinDescriptor {
+
+ public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2);
+ }
+
+ public HashJoinBuildFirstProperties(FieldList keys1, FieldList keys2,
+ boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
+ {
+ super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.HYBRIDHASH_BUILD_FIRST;
+ }
+
+ @Override
+ protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+ // all properties are possible
+ return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
+ }
+
+ @Override
+ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2)
+ {
+ return true;
+ }
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+ DriverStrategy strategy;
+
+ if(!in1.isOnDynamicPath() && in2.isOnDynamicPath()) {
+ // sanity check that the first input is cached and remove that cache
+ if (!in1.getTempMode().isCached()) {
+ throw new CompilerException("No cache at point where static and dynamic parts meet.");
+ }
+ in1.setTempMode(in1.getTempMode().makeNonCached());
+ strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST_CACHED;
+ }
+ else {
+ strategy = DriverStrategy.HYBRIDHASH_BUILD_FIRST;
+ }
+ return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ return new LocalProperties();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
new file mode 100644
index 0000000..f9d1e6c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashJoinBuildSecondProperties.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public final class HashJoinBuildSecondProperties extends AbstractJoinDescriptor {
+
+ public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2);
+ }
+
+ public HashJoinBuildSecondProperties(FieldList keys1, FieldList keys2,
+ boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
+ {
+ super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.HYBRIDHASH_BUILD_SECOND;
+ }
+
+ @Override
+ protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+ // all properties are possible
+ return Collections.singletonList(new LocalPropertiesPair(
+ new RequestedLocalProperties(), new RequestedLocalProperties()));
+ }
+
+ @Override
+ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2)
+ {
+ return true;
+ }
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+ DriverStrategy strategy;
+
+ if (!in2.isOnDynamicPath() && in1.isOnDynamicPath()) {
+ // sanity check that the first input is cached and remove that cache
+ if (!in2.getTempMode().isCached()) {
+ throw new CompilerException("No cache at point where static and dynamic parts meet.");
+ }
+
+ in2.setTempMode(in2.getTempMode().makeNonCached());
+ strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND_CACHED;
+ }
+ else {
+ strategy = DriverStrategy.HYBRIDHASH_BUILD_SECOND;
+ }
+ return new DualInputPlanNode(node, "Join ("+node.getOperator().getName()+")", in1, in2, strategy, this.keys1, this.keys2);
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ return new LocalProperties();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
new file mode 100644
index 0000000..9f14d2a
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class MapDescriptor extends OperatorDescriptorSingle {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.MAP;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "Map ("+node.getOperator().getName()+")", in, DriverStrategy.MAP);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setAnyDistribution();
+ return Collections.singletonList(rgp);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
new file mode 100644
index 0000000..1489097
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/MapPartitionDescriptor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class MapPartitionDescriptor extends OperatorDescriptorSingle {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.MAP_PARTITION;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "MapPartition ("+node.getOperator().getName()+")", in, DriverStrategy.MAP_PARTITION);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties rgp = new RequestedGlobalProperties();
+ rgp.setAnyDistribution();
+ return Collections.singletonList(rgp);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
new file mode 100644
index 0000000..7ae35c3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/NoOpDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+
+public class NoOpDescriptor extends OperatorDescriptorSingle {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.UNARY_NO_OP;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "Pipe", in, DriverStrategy.UNARY_NO_OP);
+ }
+
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ return Collections.singletonList(new RequestedGlobalProperties());
+ }
+
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ return gProps;
+ }
+
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
new file mode 100644
index 0000000..c21593e
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorDual.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+
+/**
+ *
+ */
+public abstract class OperatorDescriptorDual implements AbstractOperatorDescriptor {
+
+ protected final FieldList keys1;
+ protected final FieldList keys2;
+
+ private List<GlobalPropertiesPair> globalProps;
+ private List<LocalPropertiesPair> localProps;
+
+ protected OperatorDescriptorDual() {
+ this(null, null);
+ }
+
+ protected OperatorDescriptorDual(FieldList keys1, FieldList keys2) {
+ this.keys1 = keys1;
+ this.keys2 = keys2;
+ }
+
+ public List<GlobalPropertiesPair> getPossibleGlobalProperties() {
+ if (this.globalProps == null) {
+ this.globalProps = createPossibleGlobalProperties();
+ }
+
+ return this.globalProps;
+ }
+
+ public List<LocalPropertiesPair> getPossibleLocalProperties() {
+ if (this.localProps == null) {
+ this.localProps = createPossibleLocalProperties();
+ }
+
+ return this.localProps;
+ }
+
+ protected abstract List<GlobalPropertiesPair> createPossibleGlobalProperties();
+
+ protected abstract List<LocalPropertiesPair> createPossibleLocalProperties();
+
+ public abstract boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2);
+
+ public abstract boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2);
+
+ public abstract DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node);
+
+ public abstract GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2);
+
+ public abstract LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2);
+
+ protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2) {
+
+ // check number of produced partitioning fields
+ if(fields1.size() != fields2.size()) {
+ return false;
+ } else {
+ return checkEquivalentFieldPositionsInKeyFields(fields1, fields2, fields1.size());
+ }
+ }
+
+ protected boolean checkEquivalentFieldPositionsInKeyFields(FieldList fields1, FieldList fields2, int numRelevantFields) {
+
+ // check number of produced partitioning fields
+ if(fields1.size() < numRelevantFields || fields2.size() < numRelevantFields) {
+ return false;
+ }
+ else {
+ for(int i=0; i<numRelevantFields; i++) {
+ int pField1 = fields1.get(i);
+ int pField2 = fields2.get(i);
+ // check if position of both produced fields is the same in both requested fields
+ int j;
+ for(j=0; j<this.keys1.size(); j++) {
+ if(this.keys1.get(j) == pField1 && this.keys2.get(j) == pField2) {
+ break;
+ }
+ else if(this.keys1.get(j) != pField1 && this.keys2.get(j) != pField2) {
+ // do nothing
+ }
+ else {
+ return false;
+ }
+ }
+ if(j == this.keys1.size()) {
+ throw new CompilerException("Fields were not found in key fields.");
+ }
+ }
+ }
+ return true;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final class GlobalPropertiesPair {
+
+ private final RequestedGlobalProperties props1, props2;
+
+ public GlobalPropertiesPair(RequestedGlobalProperties props1, RequestedGlobalProperties props2) {
+ this.props1 = props1;
+ this.props2 = props2;
+ }
+
+ public RequestedGlobalProperties getProperties1() {
+ return this.props1;
+ }
+
+ public RequestedGlobalProperties getProperties2() {
+ return this.props2;
+ }
+
+ @Override
+ public int hashCode() {
+ return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == GlobalPropertiesPair.class) {
+ final GlobalPropertiesPair other = (GlobalPropertiesPair) obj;
+
+ return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) &&
+ (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2));
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "{" + this.props1 + " / " + this.props2 + "}";
+ }
+ }
+
+ public static final class LocalPropertiesPair {
+
+ private final RequestedLocalProperties props1, props2;
+
+ public LocalPropertiesPair(RequestedLocalProperties props1, RequestedLocalProperties props2) {
+ this.props1 = props1;
+ this.props2 = props2;
+ }
+
+ public RequestedLocalProperties getProperties1() {
+ return this.props1;
+ }
+
+ public RequestedLocalProperties getProperties2() {
+ return this.props2;
+ }
+
+ @Override
+ public int hashCode() {
+ return (this.props1 == null ? 0 : this.props1.hashCode()) ^ (this.props2 == null ? 0 : this.props2.hashCode());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj.getClass() == LocalPropertiesPair.class) {
+ final LocalPropertiesPair other = (LocalPropertiesPair) obj;
+
+ return (this.props1 == null ? other.props1 == null : this.props1.equals(other.props1)) &&
+ (this.props2 == null ? other.props2 == null : this.props2.equals(other.props2));
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "{" + this.props1 + " / " + this.props2 + "}";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
new file mode 100644
index 0000000..c8be5d4
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/OperatorDescriptorSingle.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+
+/**
+ * Abstract base class for Operator descriptions which instantiates the node and sets the driver
+ * strategy and the sorting and grouping keys. Returns possible local and global properties and
+ * updates them after the operation has been performed.
+ * @see org.apache.flink.compiler.dag.SingleInputNode
+ */
+public abstract class OperatorDescriptorSingle implements AbstractOperatorDescriptor {
+
+ protected final FieldSet keys; // the set of key fields
+ protected final FieldList keyList; // the key fields with ordered field positions
+
+ private List<RequestedGlobalProperties> globalProps;
+ private List<RequestedLocalProperties> localProps;
+
+
+ protected OperatorDescriptorSingle() {
+ this(null);
+ }
+
+ protected OperatorDescriptorSingle(FieldSet keys) {
+ this.keys = keys;
+ this.keyList = keys == null ? null : keys.toFieldList();
+ }
+
+
+ public List<RequestedGlobalProperties> getPossibleGlobalProperties() {
+ if (this.globalProps == null) {
+ this.globalProps = createPossibleGlobalProperties();
+ }
+ return this.globalProps;
+ }
+
+ public List<RequestedLocalProperties> getPossibleLocalProperties() {
+ if (this.localProps == null) {
+ this.localProps = createPossibleLocalProperties();
+ }
+ return this.localProps;
+ }
+
+ /**
+ * Returns a list of global properties that are required by this operator descriptor.
+ *
+ * @return A list of global properties that are required by this operator descriptor.
+ */
+ protected abstract List<RequestedGlobalProperties> createPossibleGlobalProperties();
+
+ /**
+ * Returns a list of local properties that are required by this operator descriptor.
+ *
+ * @return A list of local properties that are required by this operator descriptor.
+ */
+ protected abstract List<RequestedLocalProperties> createPossibleLocalProperties();
+
+ public abstract SingleInputPlanNode instantiate(Channel in, SingleInputNode node);
+
+ /**
+ * Returns the global properties which are present after the operator was applied on the
+ * provided global properties.
+ *
+ * @param in The global properties on which the operator is applied.
+ * @return The global properties which are valid after the operator has been applied.
+ */
+ public abstract GlobalProperties computeGlobalProperties(GlobalProperties in);
+
+ /**
+ * Returns the local properties which are present after the operator was applied on the
+ * provided local properties.
+ *
+ * @param in The local properties on which the operator is applied.
+ * @return The local properties which are valid after the operator has been applied.
+ */
+ public abstract LocalProperties computeLocalProperties(LocalProperties in);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
new file mode 100644
index 0000000..2bde29b
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/PartialGroupProperties.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+public final class PartialGroupProperties extends OperatorDescriptorSingle {
+
+ public PartialGroupProperties(FieldSet keys) {
+ super(keys);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.SORTED_GROUP_COMBINE;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ // create in input node for combine with same DOP as input node
+ GroupReduceNode combinerNode = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) node.getOperator());
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+
+ SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode, "Combine("+node.getOperator().getName()+")", in,
+ DriverStrategy.SORTED_GROUP_COMBINE);
+ // sorting key info
+ combiner.setDriverKeyInfo(in.getLocalStrategyKeys(), in.getLocalStrategySortOrder(), 0);
+ // set grouping comparator key info
+ combiner.setDriverKeyInfo(this.keyList, 1);
+
+ return combiner;
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ return Collections.singletonList(new RequestedGlobalProperties());
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ RequestedLocalProperties props = new RequestedLocalProperties();
+ props.setGroupedFields(this.keys);
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+ gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
+ {
+ gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+ }
+ gProps.clearUniqueFieldCombinations();
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps.clearUniqueFieldSets();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
new file mode 100644
index 0000000..5bb51f3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.operators.util.FieldSet;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.ReduceNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.io.network.DataExchangeMode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+
+public final class ReduceProperties extends OperatorDescriptorSingle {
+
+ private final Partitioner<?> customPartitioner;
+
+ public ReduceProperties(FieldSet keys) {
+ this(keys, null);
+ }
+
+ public ReduceProperties(FieldSet keys, Partitioner<?> customPartitioner) {
+ super(keys);
+ this.customPartitioner = customPartitioner;
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.SORTED_REDUCE;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
+ (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
+ {
+ return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
+ DriverStrategy.SORTED_REDUCE, this.keyList);
+ }
+ else {
+ // non forward case. all local properties are killed anyways, so we can safely plug in a combiner
+ Channel toCombiner = new Channel(in.getSource());
+ toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);
+
+ // create an input node for combine with same DOP as input node
+ ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
+ combinerNode.setDegreeOfParallelism(in.getSource().getParallelism());
+
+ SingleInputPlanNode combiner = new SingleInputPlanNode(combinerNode,
+ "Combine ("+node.getOperator().getName()+")", toCombiner,
+ DriverStrategy.SORTED_PARTIAL_REDUCE, this.keyList);
+
+ combiner.setCosts(new Costs(0, 0));
+ combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
+
+ Channel toReducer = new Channel(combiner);
+ toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
+ in.getShipStrategySortOrder(), in.getDataExchangeMode());
+ toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());
+
+ return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer,
+ DriverStrategy.SORTED_REDUCE, this.keyList);
+ }
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties props = new RequestedGlobalProperties();
+ if (customPartitioner == null) {
+ props.setAnyPartitioning(this.keys);
+ } else {
+ props.setCustomPartitioned(this.keys, this.customPartitioner);
+ }
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ RequestedLocalProperties props = new RequestedLocalProperties();
+ props.setGroupedFields(this.keys);
+ return Collections.singletonList(props);
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ if (gProps.getUniqueFieldCombination() != null && gProps.getUniqueFieldCombination().size() > 0 &&
+ gProps.getPartitioning() == PartitioningProperty.RANDOM_PARTITIONED)
+ {
+ gProps.setAnyPartitioning(gProps.getUniqueFieldCombination().iterator().next().toFieldList());
+ }
+ gProps.clearUniqueFieldCombinations();
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps.clearUniqueFieldSets();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
new file mode 100644
index 0000000..1dcd87d
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SolutionSetDeltaOperator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ *
+ */
+public class SolutionSetDeltaOperator extends OperatorDescriptorSingle {
+
+ public SolutionSetDeltaOperator(FieldList partitioningFields) {
+ super(partitioningFields);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.UNARY_NO_OP;
+ }
+
+ @Override
+ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
+ return new SingleInputPlanNode(node, "SolutionSet Delta", in, DriverStrategy.UNARY_NO_OP);
+ }
+
+ @Override
+ protected List<RequestedGlobalProperties> createPossibleGlobalProperties() {
+ RequestedGlobalProperties partProps = new RequestedGlobalProperties();
+ partProps.setHashPartitioned(this.keyList);
+ return Collections.singletonList(partProps);
+ }
+
+ @Override
+ protected List<RequestedLocalProperties> createPossibleLocalProperties() {
+ return Collections.singletonList(new RequestedLocalProperties());
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties gProps) {
+ return gProps;
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties lProps) {
+ return lProps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
new file mode 100644
index 0000000..356836a
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/SortMergeJoinDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.util.Utils;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ *
+ */
+public class SortMergeJoinDescriptor extends AbstractJoinDescriptor {
+
+ public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2) {
+ super(keys1, keys2);
+ }
+
+ public SortMergeJoinDescriptor(FieldList keys1, FieldList keys2,
+ boolean broadcastFirstAllowed, boolean broadcastSecondAllowed, boolean repartitionAllowed)
+ {
+ super(keys1, keys2, broadcastFirstAllowed, broadcastSecondAllowed, repartitionAllowed);
+ }
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.MERGE;
+ }
+
+ @Override
+ protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+ RequestedLocalProperties sort1 = new RequestedLocalProperties(Utils.createOrdering(this.keys1));
+ RequestedLocalProperties sort2 = new RequestedLocalProperties(Utils.createOrdering(this.keys2));
+ return Collections.singletonList(new LocalPropertiesPair(sort1, sort2));
+ }
+
+ @Override
+ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2)
+ {
+ int numRelevantFields = this.keys1.size();
+
+ Ordering prod1 = produced1.getOrdering();
+ Ordering prod2 = produced2.getOrdering();
+
+ if (prod1 == null || prod2 == null) {
+ throw new CompilerException("The given properties do not meet this operators requirements.");
+ }
+
+ // check that order of fields is equivalent
+ if (!checkEquivalentFieldPositionsInKeyFields(
+ prod1.getInvolvedIndexes(), prod2.getInvolvedIndexes(), numRelevantFields)) {
+ return false;
+ }
+
+ // check that both inputs have the same directions of order
+ for (int i = 0; i < numRelevantFields; i++) {
+ if (prod1.getOrder(i) != prod2.getOrder(i)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+ boolean[] inputOrders = in1.getLocalProperties().getOrdering().getFieldSortDirections();
+
+ if (inputOrders == null || inputOrders.length < this.keys1.size()) {
+ throw new CompilerException("BUG: The input strategy does not sufficiently describe the sort orders for a merge operator.");
+ } else if (inputOrders.length > this.keys1.size()) {
+ boolean[] tmp = new boolean[this.keys1.size()];
+ System.arraycopy(inputOrders, 0, tmp, 0, tmp.length);
+ inputOrders = tmp;
+ }
+
+ return new DualInputPlanNode(node, "Join("+node.getOperator().getName()+")", in1, in2, DriverStrategy.MERGE, this.keys1, this.keys2, inputOrders);
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ LocalProperties comb = LocalProperties.combine(in1, in2);
+ return comb.clearUniqueFieldSets();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
new file mode 100644
index 0000000..c42cff2
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/UtilSinkJoinOpDescriptor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.operators;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.dag.SinkJoiner;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dataproperties.GlobalProperties;
+import org.apache.flink.optimizer.dataproperties.LocalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedGlobalProperties;
+import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkJoinerPlanNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ *
+ */
+public class UtilSinkJoinOpDescriptor extends OperatorDescriptorDual {
+
+ @Override
+ public DriverStrategy getStrategy() {
+ return DriverStrategy.BINARY_NO_OP;
+ }
+
+ @Override
+ protected List<GlobalPropertiesPair> createPossibleGlobalProperties() {
+ // all properties are possible
+ return Collections.singletonList(new GlobalPropertiesPair(
+ new RequestedGlobalProperties(), new RequestedGlobalProperties()));
+ }
+
+ @Override
+ protected List<LocalPropertiesPair> createPossibleLocalProperties() {
+ // all properties are possible
+ return Collections.singletonList(new LocalPropertiesPair(
+ new RequestedLocalProperties(), new RequestedLocalProperties()));
+ }
+
+ @Override
+ public boolean areCompatible(RequestedGlobalProperties requested1, RequestedGlobalProperties requested2,
+ GlobalProperties produced1, GlobalProperties produced2) {
+ return true;
+ }
+
+ @Override
+ public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
+ LocalProperties produced1, LocalProperties produced2) {
+ return true;
+ }
+
+ @Override
+ public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {
+ if (node instanceof SinkJoiner) {
+ return new SinkJoinerPlanNode((SinkJoiner) node, in1, in2);
+ } else {
+ throw new CompilerException();
+ }
+ }
+
+ @Override
+ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
+ return new LocalProperties();
+ }
+
+ @Override
+ public GlobalProperties computeGlobalProperties(GlobalProperties in1, GlobalProperties in2) {
+ return GlobalProperties.combine(in1, in2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
new file mode 100644
index 0000000..bf22fb3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BinaryUnionPlanNode.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.plan;
+
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * A special subclass for the union to make it identifiable.
+ */
+public class BinaryUnionPlanNode extends DualInputPlanNode {
+
+ /**
+ * @param template
+ */
+ public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, Channel in2) {
+ super(template, "Union", in1, in2, DriverStrategy.UNION);
+ }
+
+ public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) {
+ super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", toSwapFrom.getInput2(), toSwapFrom.getInput1(),
+ DriverStrategy.UNION_WITH_CACHED);
+
+ this.globalProps = toSwapFrom.globalProps;
+ this.localProps = toSwapFrom.localProps;
+ this.nodeCosts = toSwapFrom.nodeCosts;
+ this.cumulativeCosts = toSwapFrom.cumulativeCosts;
+
+ setParallelism(toSwapFrom.getParallelism());
+ }
+
+ public BinaryUnionNode getOptimizerNode() {
+ return (BinaryUnionNode) this.template;
+ }
+
+ public boolean unionsStaticAndDynamicPath() {
+ return getInput1().isOnDynamicPath() != getInput2().isOnDynamicPath();
+ }
+
+ @Override
+ public int getMemoryConsumerWeight() {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
new file mode 100644
index 0000000..e79e2f3
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plan/BulkIterationPlanNode.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.plan;
+
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE;
+import static org.apache.flink.optimizer.plan.PlanNode.SourceAndDamReport.FOUND_SOURCE_AND_DAM;
+
+import java.util.HashMap;
+
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.costs.Costs;
+import org.apache.flink.optimizer.dag.BulkIterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.util.Visitor;
+
+public class BulkIterationPlanNode extends SingleInputPlanNode implements IterationPlanNode {
+
+ private final BulkPartialSolutionPlanNode partialSolutionPlanNode;
+
+ private final PlanNode rootOfStepFunction;
+
+ private PlanNode rootOfTerminationCriterion;
+
+ private TypeSerializerFactory<?> serializerForIterationChannel;
+
+ // --------------------------------------------------------------------------------------------
+
+ public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input,
+ BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction)
+ {
+ super(template, nodeName, input, DriverStrategy.NONE);
+ this.partialSolutionPlanNode = pspn;
+ this.rootOfStepFunction = rootOfStepFunction;
+
+ mergeBranchPlanMaps();
+ }
+
+ public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input,
+ BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction, PlanNode rootOfTerminationCriterion)
+ {
+ this(template, nodeName, input, pspn, rootOfStepFunction);
+ this.rootOfTerminationCriterion = rootOfTerminationCriterion;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public BulkIterationNode getIterationNode() {
+ if (this.template instanceof BulkIterationNode) {
+ return (BulkIterationNode) this.template;
+ } else {
+ throw new RuntimeException();
+ }
+ }
+
+ public BulkPartialSolutionPlanNode getPartialSolutionPlanNode() {
+ return this.partialSolutionPlanNode;
+ }
+
+ public PlanNode getRootOfStepFunction() {
+ return this.rootOfStepFunction;
+ }
+
+ public PlanNode getRootOfTerminationCriterion() {
+ return this.rootOfTerminationCriterion;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+
+ public TypeSerializerFactory<?> getSerializerForIterationChannel() {
+ return serializerForIterationChannel;
+ }
+
+ public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializerForIterationChannel) {
+ this.serializerForIterationChannel = serializerForIterationChannel;
+ }
+
+ public void setCosts(Costs nodeCosts) {
+ // add the costs from the step function
+ nodeCosts.addCosts(this.rootOfStepFunction.getCumulativeCosts());
+
+ // add the costs for the termination criterion, if it exists
+ // the costs are divided at branches, so we can simply add them up
+ if (rootOfTerminationCriterion != null) {
+ nodeCosts.addCosts(this.rootOfTerminationCriterion.getCumulativeCosts());
+ }
+
+ super.setCosts(nodeCosts);
+ }
+
+ public int getMemoryConsumerWeight() {
+ return 1;
+ }
+
+
+ @Override
+ public SourceAndDamReport hasDamOnPathDownTo(PlanNode source) {
+ if (source == this) {
+ return FOUND_SOURCE;
+ }
+
+ SourceAndDamReport fromOutside = super.hasDamOnPathDownTo(source);
+
+ if (fromOutside == FOUND_SOURCE_AND_DAM) {
+ return FOUND_SOURCE_AND_DAM;
+ }
+ else if (fromOutside == FOUND_SOURCE) {
+ // we always have a dam in the back channel
+ return FOUND_SOURCE_AND_DAM;
+ } else {
+ // check the step function for dams
+ return this.rootOfStepFunction.hasDamOnPathDownTo(source);
+ }
+ }
+
+ @Override
+ public void acceptForStepFunction(Visitor<PlanNode> visitor) {
+ this.rootOfStepFunction.accept(visitor);
+
+ if(this.rootOfTerminationCriterion != null) {
+ this.rootOfTerminationCriterion.accept(visitor);
+ }
+ }
+
+ private void mergeBranchPlanMaps() {
+ for (OptimizerNode.UnclosedBranchDescriptor desc: template.getOpenBranches()) {
+ OptimizerNode brancher = desc.getBranchingNode();
+
+ if (branchPlan == null) {
+ branchPlan = new HashMap<OptimizerNode, PlanNode>(6);
+ }
+
+ if (!branchPlan.containsKey(brancher)) {
+ PlanNode selectedCandidate = null;
+
+ if (rootOfStepFunction.branchPlan != null) {
+ selectedCandidate = rootOfStepFunction.branchPlan.get(brancher);
+ }
+
+ if (selectedCandidate == null) {
+ throw new CompilerException(
+ "Candidates for a node with open branches are missing information about the selected candidate ");
+ }
+
+ this.branchPlan.put(brancher, selectedCandidate);
+ }
+ }
+ }
+}