You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/20 11:06:49 UTC
[10/53] [abbrv] flink git commit: [optimizer] Rename optimizer
project to "flink-optimizer" (previously flink-compiler)
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java
new file mode 100644
index 0000000..2d8377e
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/GenericFlatTypePostPass.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.postpass;
+
+import java.util.Map;
+
+import org.apache.flink.api.common.operators.SemanticProperties;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerPostPassException;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.SingleInputNode;
+import org.apache.flink.optimizer.dag.TwoInputNode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+
+/**
+ *
+ */
+public abstract class GenericFlatTypePostPass<X, T extends AbstractSchema<X>> implements OptimizerPostPass {
+
+ private boolean propagateParentSchemaDown = true;
+
+
+ public boolean isPropagateParentSchemaDown() {
+ return propagateParentSchemaDown;
+ }
+
+ public void setPropagateParentSchemaDown(boolean propagateParentSchemaDown) {
+ this.propagateParentSchemaDown = propagateParentSchemaDown;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Generic schema inferring traversal
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void postPass(OptimizedPlan plan) {
+ for (SinkPlanNode sink : plan.getDataSinks()) {
+ traverse(sink, null, true);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ protected void traverse(PlanNode node, T parentSchema, boolean createUtilities) {
+ // distinguish the node types
+ if (node instanceof SinkPlanNode) {
+ SinkPlanNode sn = (SinkPlanNode) node;
+ Channel inchannel = sn.getInput();
+
+ T schema = createEmptySchema();
+ sn.postPassHelper = schema;
+
+ // add the sinks information to the schema
+ try {
+ getSinkSchema(sn, schema);
+ }
+ catch (ConflictingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Conflicting type infomation for the data sink '" +
+ sn.getSinkNode().getOperator().getName() + "'.");
+ }
+
+ // descend to the input channel
+ try {
+ propagateToChannel(schema, inchannel, createUtilities);
+ }
+ catch (MissingFieldTypeInfoException ex) {
+ throw new CompilerPostPassException("Missing type infomation for the channel that inputs to the data sink '" +
+ sn.getSinkNode().getOperator().getName() + "'.");
+ }
+ }
+ else if (node instanceof SourcePlanNode) {
+ if (createUtilities) {
+ ((SourcePlanNode) node).setSerializer(createSerializer(parentSchema, node));
+ // nothing else to be done here. the source has no input and no strategy itself
+ }
+ }
+ else if (node instanceof BulkIterationPlanNode) {
+ BulkIterationPlanNode iterationNode = (BulkIterationPlanNode) node;
+
+ // get the nodes current schema
+ T schema;
+ if (iterationNode.postPassHelper == null) {
+ schema = createEmptySchema();
+ iterationNode.postPassHelper = schema;
+ } else {
+ schema = (T) iterationNode.postPassHelper;
+ }
+ schema.increaseNumConnectionsThatContributed();
+
+ // add the parent schema to the schema
+ if (propagateParentSchemaDown) {
+ addSchemaToSchema(parentSchema, schema, iterationNode.getProgramOperator().getName());
+ }
+
+ // check whether all outgoing channels have not yet contributed. come back later if not.
+ if (schema.getNumConnectionsThatContributed() < iterationNode.getOutgoingChannels().size()) {
+ return;
+ }
+
+ if (iterationNode.getRootOfStepFunction() instanceof NAryUnionPlanNode) {
+ throw new CompilerException("Optimizer cannot compile an iteration step function where next partial solution is created by a Union node.");
+ }
+
+ // traverse the termination criterion for the first time. create schema only, no utilities. Needed in case of intermediate termination criterion
+ if (iterationNode.getRootOfTerminationCriterion() != null) {
+ SingleInputPlanNode addMapper = (SingleInputPlanNode) iterationNode.getRootOfTerminationCriterion();
+ traverse(addMapper.getInput().getSource(), createEmptySchema(), false);
+ try {
+ addMapper.getInput().setSerializer(createSerializer(createEmptySchema()));
+ } catch (MissingFieldTypeInfoException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // traverse the step function for the first time. create schema only, no utilities
+ traverse(iterationNode.getRootOfStepFunction(), schema, false);
+
+ T pss = (T) iterationNode.getPartialSolutionPlanNode().postPassHelper;
+ if (pss == null) {
+ throw new CompilerException("Error in Optimizer Post Pass: Partial solution schema is null after first traversal of the step function.");
+ }
+
+ // traverse the step function for the second time, taking the schema of the partial solution
+ traverse(iterationNode.getRootOfStepFunction(), pss, createUtilities);
+
+ if (iterationNode.getRootOfTerminationCriterion() != null) {
+ SingleInputPlanNode addMapper = (SingleInputPlanNode) iterationNode.getRootOfTerminationCriterion();
+ traverse(addMapper.getInput().getSource(), createEmptySchema(), createUtilities);
+ try {
+ addMapper.getInput().setSerializer(createSerializer(createEmptySchema()));
+ } catch (MissingFieldTypeInfoException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // take the schema from the partial solution node and add its fields to the iteration result schema.
+ // input and output schema need to be identical, so this is essentially a sanity check
+ addSchemaToSchema(pss, schema, iterationNode.getProgramOperator().getName());
+
+ // set the serializer
+ if (createUtilities) {
+ iterationNode.setSerializerForIterationChannel(createSerializer(pss, iterationNode.getPartialSolutionPlanNode()));
+ }
+
+ // done, we can now propagate our info down
+ try {
+ propagateToChannel(schema, iterationNode.getInput(), createUtilities);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '"
+ + iterationNode.getProgramOperator().getName() + "'. Missing type information for key field " +
+ e.getFieldNumber());
+ }
+ }
+ else if (node instanceof WorksetIterationPlanNode) {
+ WorksetIterationPlanNode iterationNode = (WorksetIterationPlanNode) node;
+
+ // get the nodes current schema
+ T schema;
+ if (iterationNode.postPassHelper == null) {
+ schema = createEmptySchema();
+ iterationNode.postPassHelper = schema;
+ } else {
+ schema = (T) iterationNode.postPassHelper;
+ }
+ schema.increaseNumConnectionsThatContributed();
+
+ // add the parent schema to the schema (which refers to the solution set schema)
+ if (propagateParentSchemaDown) {
+ addSchemaToSchema(parentSchema, schema, iterationNode.getProgramOperator().getName());
+ }
+
+ // check whether all outgoing channels have not yet contributed. come back later if not.
+ if (schema.getNumConnectionsThatContributed() < iterationNode.getOutgoingChannels().size()) {
+ return;
+ }
+ if (iterationNode.getNextWorkSetPlanNode() instanceof NAryUnionPlanNode) {
+ throw new CompilerException("Optimizer cannot compile a workset iteration step function where the next workset is produced by a Union node.");
+ }
+ if (iterationNode.getSolutionSetDeltaPlanNode() instanceof NAryUnionPlanNode) {
+ throw new CompilerException("Optimizer cannot compile a workset iteration step function where the solution set delta is produced by a Union node.");
+ }
+
+ // traverse the step function
+ // pass an empty schema to the next workset and the parent schema to the solution set delta
+ // these first traversals are schema only
+ traverse(iterationNode.getNextWorkSetPlanNode(), createEmptySchema(), false);
+ traverse(iterationNode.getSolutionSetDeltaPlanNode(), schema, false);
+
+ T wss = (T) iterationNode.getWorksetPlanNode().postPassHelper;
+ T sss = (T) iterationNode.getSolutionSetPlanNode().postPassHelper;
+
+ if (wss == null) {
+ throw new CompilerException("Error in Optimizer Post Pass: Workset schema is null after first traversal of the step function.");
+ }
+ if (sss == null) {
+ throw new CompilerException("Error in Optimizer Post Pass: Solution set schema is null after first traversal of the step function.");
+ }
+
+ // make the second pass and instantiate the utilities
+ traverse(iterationNode.getNextWorkSetPlanNode(), wss, createUtilities);
+ traverse(iterationNode.getSolutionSetDeltaPlanNode(), sss, createUtilities);
+
+ // add the types from the solution set schema to the iteration's own schema. since
+ // the solution set input and the result must have the same schema, this acts as a sanity check.
+ try {
+ for (Map.Entry<Integer, X> entry : sss) {
+ Integer pos = entry.getKey();
+ schema.addType(pos, entry.getValue());
+ }
+ } catch (ConflictingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber()
+ + " in node '" + iterationNode.getProgramOperator().getName() + "'. Contradicting types between the " +
+ "result of the iteration and the solution set schema: " + e.getPreviousType() +
+ " and " + e.getNewType() + ". Most probable cause: Invalid constant field annotations.");
+ }
+
+ // set the serializers and comparators
+ if (createUtilities) {
+ WorksetIterationNode optNode = iterationNode.getIterationNode();
+ iterationNode.setWorksetSerializer(createSerializer(wss, iterationNode.getWorksetPlanNode()));
+ iterationNode.setSolutionSetSerializer(createSerializer(sss, iterationNode.getSolutionSetPlanNode()));
+ try {
+ iterationNode.setSolutionSetComparator(createComparator(optNode.getSolutionSetKeyFields(), null, sss));
+ } catch (MissingFieldTypeInfoException ex) {
+ throw new CompilerPostPassException("Could not set up the solution set for workset iteration '" +
+ optNode.getOperator().getName() + "'. Missing type information for key field " + ex.getFieldNumber() + '.');
+ }
+ }
+
+ // done, we can now propagate our info down
+ try {
+ propagateToChannel(schema, iterationNode.getInitialSolutionSetInput(), createUtilities);
+ propagateToChannel(wss, iterationNode.getInitialWorksetInput(), createUtilities);
+ } catch (MissingFieldTypeInfoException ex) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '"
+ + iterationNode.getProgramOperator().getName() + "'. Missing type information for key field " +
+ ex.getFieldNumber());
+ }
+ }
+ else if (node instanceof SingleInputPlanNode) {
+ SingleInputPlanNode sn = (SingleInputPlanNode) node;
+
+ // get the nodes current schema
+ T schema;
+ if (sn.postPassHelper == null) {
+ schema = createEmptySchema();
+ sn.postPassHelper = schema;
+ } else {
+ schema = (T) sn.postPassHelper;
+ }
+ schema.increaseNumConnectionsThatContributed();
+ SingleInputNode optNode = sn.getSingleInputNode();
+
+ // add the parent schema to the schema
+ if (propagateParentSchemaDown) {
+ addSchemaToSchema(parentSchema, schema, optNode, 0);
+ }
+
+ // check whether all outgoing channels have not yet contributed. come back later if not.
+ if (schema.getNumConnectionsThatContributed() < sn.getOutgoingChannels().size()) {
+ return;
+ }
+
+ // add the nodes local information
+ try {
+ getSingleInputNodeSchema(sn, schema);
+ } catch (ConflictingFieldTypeInfoException e) {
+ throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getOperator().getName()));
+ }
+
+ if (createUtilities) {
+ // parameterize the node's driver strategy
+ for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
+ try {
+ sn.setComparator(createComparator(sn.getKeys(i), sn.getSortOrders(i), schema),i);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for node '" +
+ optNode.getOperator().getName() + "'. Missing type information for key field " +
+ e.getFieldNumber());
+ }
+ }
+ }
+
+ // done, we can now propagate our info down
+ try {
+ propagateToChannel(schema, sn.getInput(), createUtilities);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for input channel to node '" +
+ optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
+ }
+
+ // don't forget the broadcast inputs
+ for (Channel c: sn.getBroadcastInputs()) {
+ try {
+ propagateToChannel(createEmptySchema(), c, createUtilities);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for broadcast channel in node '" +
+ optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
+ }
+ }
+ }
+ else if (node instanceof DualInputPlanNode) {
+ DualInputPlanNode dn = (DualInputPlanNode) node;
+
+ // get the nodes current schema
+ T schema1;
+ T schema2;
+ if (dn.postPassHelper1 == null) {
+ schema1 = createEmptySchema();
+ schema2 = createEmptySchema();
+ dn.postPassHelper1 = schema1;
+ dn.postPassHelper2 = schema2;
+ } else {
+ schema1 = (T) dn.postPassHelper1;
+ schema2 = (T) dn.postPassHelper2;
+ }
+
+ schema1.increaseNumConnectionsThatContributed();
+ schema2.increaseNumConnectionsThatContributed();
+ TwoInputNode optNode = dn.getTwoInputNode();
+
+ // add the parent schema to the schema
+ if (propagateParentSchemaDown) {
+ addSchemaToSchema(parentSchema, schema1, optNode, 0);
+ addSchemaToSchema(parentSchema, schema2, optNode, 1);
+ }
+
+ // check whether all outgoing channels have not yet contributed. come back later if not.
+ if (schema1.getNumConnectionsThatContributed() < dn.getOutgoingChannels().size()) {
+ return;
+ }
+
+ // add the nodes local information
+ try {
+ getDualInputNodeSchema(dn, schema1, schema2);
+ } catch (ConflictingFieldTypeInfoException e) {
+ throw new CompilerPostPassException(getConflictingTypeErrorMessage(e, optNode.getOperator().getName()));
+ }
+
+ // parameterize the node's driver strategy
+ if (createUtilities) {
+ if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
+ // set the individual comparators
+ try {
+ dn.setComparator1(createComparator(dn.getKeysForInput1(), dn.getSortOrders(), schema1));
+ dn.setComparator2(createComparator(dn.getKeysForInput2(), dn.getSortOrders(), schema2));
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for node '" +
+ optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
+ }
+
+ // set the pair comparator
+ try {
+ dn.setPairComparator(createPairComparator(dn.getKeysForInput1(), dn.getKeysForInput2(),
+ dn.getSortOrders(), schema1, schema2));
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for node '" +
+ optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
+ }
+
+ }
+ }
+
+ // done, we can now propagate our info down
+ try {
+ propagateToChannel(schema1, dn.getInput1(), createUtilities);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for the first input channel to node '"
+ + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
+ }
+ try {
+ propagateToChannel(schema2, dn.getInput2(), createUtilities);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for the second input channel to node '"
+ + optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
+ }
+
+ // don't forget the broadcast inputs
+ for (Channel c: dn.getBroadcastInputs()) {
+ try {
+ propagateToChannel(createEmptySchema(), c, createUtilities);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for broadcast channel in node '" +
+ optNode.getOperator().getName() + "'. Missing type information for field " + e.getFieldNumber());
+ }
+ }
+ }
+ else if (node instanceof NAryUnionPlanNode) {
+ // only propagate the info down
+ try {
+ for (Channel channel : node.getInputs()) {
+ propagateToChannel(parentSchema, channel, createUtilities);
+ }
+ } catch (MissingFieldTypeInfoException ex) {
+ throw new CompilerPostPassException("Could not set up runtime strategy for the input channel to " +
+ " a union node. Missing type information for field " + ex.getFieldNumber());
+ }
+ }
+ // catch the sources of the iterative step functions
+ else if (node instanceof BulkPartialSolutionPlanNode ||
+ node instanceof SolutionSetPlanNode ||
+ node instanceof WorksetPlanNode)
+ {
+ // get the nodes current schema
+ T schema;
+ String name;
+ if (node instanceof BulkPartialSolutionPlanNode) {
+ BulkPartialSolutionPlanNode psn = (BulkPartialSolutionPlanNode) node;
+ if (psn.postPassHelper == null) {
+ schema = createEmptySchema();
+ psn.postPassHelper = schema;
+ } else {
+ schema = (T) psn.postPassHelper;
+ }
+ name = "partial solution of bulk iteration '" +
+ psn.getPartialSolutionNode().getIterationNode().getOperator().getName() + "'";
+ }
+ else if (node instanceof SolutionSetPlanNode) {
+ SolutionSetPlanNode ssn = (SolutionSetPlanNode) node;
+ if (ssn.postPassHelper == null) {
+ schema = createEmptySchema();
+ ssn.postPassHelper = schema;
+ } else {
+ schema = (T) ssn.postPassHelper;
+ }
+ name = "solution set of workset iteration '" +
+ ssn.getSolutionSetNode().getIterationNode().getOperator().getName() + "'";
+ }
+ else if (node instanceof WorksetPlanNode) {
+ WorksetPlanNode wsn = (WorksetPlanNode) node;
+ if (wsn.postPassHelper == null) {
+ schema = createEmptySchema();
+ wsn.postPassHelper = schema;
+ } else {
+ schema = (T) wsn.postPassHelper;
+ }
+ name = "workset of workset iteration '" +
+ wsn.getWorksetNode().getIterationNode().getOperator().getName() + "'";
+ } else {
+ throw new CompilerException();
+ }
+
+ schema.increaseNumConnectionsThatContributed();
+
+ // add the parent schema to the schema
+ addSchemaToSchema(parentSchema, schema, name);
+ }
+ else {
+ throw new CompilerPostPassException("Unknown node type encountered: " + node.getClass().getName());
+ }
+ }
+
+ private void propagateToChannel(T schema, Channel channel, boolean createUtilities) throws MissingFieldTypeInfoException {
+ if (createUtilities) {
+ // the serializer always exists
+ channel.setSerializer(createSerializer(schema));
+
+ // parameterize the ship strategy
+ if (channel.getShipStrategy().requiresComparator()) {
+ channel.setShipStrategyComparator(
+ createComparator(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder(), schema));
+ }
+
+ // parameterize the local strategy
+ if (channel.getLocalStrategy().requiresComparator()) {
+ channel.setLocalStrategyComparator(
+ createComparator(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder(), schema));
+ }
+ }
+
+ // propagate the channel's source model
+ traverse(channel.getSource(), schema, createUtilities);
+ }
+
+ private void addSchemaToSchema(T sourceSchema, T targetSchema, String opName) {
+ try {
+ for (Map.Entry<Integer, X> entry : sourceSchema) {
+ Integer pos = entry.getKey();
+ targetSchema.addType(pos, entry.getValue());
+ }
+ } catch (ConflictingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber()
+ + " in node '" + opName + "' propagated from successor node. " +
+ "Conflicting types: " + e.getPreviousType() + " and " + e.getNewType() +
+ ". Most probable cause: Invalid constant field annotations.");
+ }
+ }
+
+ private void addSchemaToSchema(T sourceSchema, T targetSchema, OptimizerNode optNode, int input) {
+ try {
+ for (Map.Entry<Integer, X> entry : sourceSchema) {
+ Integer pos = entry.getKey();
+ SemanticProperties sprops = optNode.getSemanticProperties();
+
+ if (sprops != null && sprops.getForwardingTargetFields(input, pos) != null && sprops.getForwardingTargetFields(input, pos).contains(pos)) {
+ targetSchema.addType(pos, entry.getValue());
+ }
+ }
+ } catch (ConflictingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Conflicting type information for field " + e.getFieldNumber()
+ + " in node '" + optNode.getOperator().getName() + "' propagated from successor node. " +
+ "Conflicting types: " + e.getPreviousType() + " and " + e.getNewType() +
+ ". Most probable cause: Invalid constant field annotations.");
+ }
+ }
+
+ private String getConflictingTypeErrorMessage(ConflictingFieldTypeInfoException e, String operatorName) {
+ return "Conflicting type information for field " + e.getFieldNumber()
+ + " in node '" + operatorName + "' between types declared in the node's "
+ + "contract and types inferred from successor contracts. Conflicting types: "
+ + e.getPreviousType() + " and " + e.getNewType()
+ + ". Most probable cause: Invalid constant field annotations.";
+ }
+
+ private TypeSerializerFactory<?> createSerializer(T schema, PlanNode node) {
+ try {
+ return createSerializer(schema);
+ } catch (MissingFieldTypeInfoException e) {
+ throw new CompilerPostPassException("Missing type information while creating serializer for '" +
+ node.getProgramOperator().getName() + "'.");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Type specific methods that extract schema information
+ // --------------------------------------------------------------------------------------------
+
+ protected abstract T createEmptySchema();
+
+ protected abstract void getSinkSchema(SinkPlanNode sink, T schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException;
+
+ protected abstract void getSingleInputNodeSchema(SingleInputPlanNode node, T schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException;
+
+ protected abstract void getDualInputNodeSchema(DualInputPlanNode node, T input1Schema, T input2Schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException;
+
+ // --------------------------------------------------------------------------------------------
+ // Methods to create serializers and comparators
+ // --------------------------------------------------------------------------------------------
+
+ protected abstract TypeSerializerFactory<?> createSerializer(T schema) throws MissingFieldTypeInfoException;
+
+ protected abstract TypeComparatorFactory<?> createComparator(FieldList fields, boolean[] directions, T schema) throws MissingFieldTypeInfoException;
+
+ protected abstract TypePairComparatorFactory<?, ?> createPairComparator(FieldList fields1, FieldList fields2, boolean[] sortDirections,
+ T schema1, T schema2) throws MissingFieldTypeInfoException;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
new file mode 100644
index 0000000..5fdf3dd
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/JavaApiPostPass.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.postpass;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeinfo.AtomicType;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerPostPassException;
+import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
+import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
+import org.apache.flink.optimizer.plan.SourcePlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.plan.WorksetPlanNode;
+import org.apache.flink.optimizer.util.NoOpUnaryUdfOp;
+import org.apache.flink.runtime.operators.DriverStrategy;
+
+/**
+ * The post-optimizer plan traversal. This traversal fills in the API specific utilities (serializers and
+ * comparators).
+ */
+public class JavaApiPostPass implements OptimizerPostPass {
+
+ private final Set<PlanNode> alreadyDone = new HashSet<PlanNode>();
+
+ private ExecutionConfig executionConfig = null;
+
+ @Override
+ public void postPass(OptimizedPlan plan) {
+
+ executionConfig = plan.getOriginalPactPlan().getExecutionConfig();
+
+ for (SinkPlanNode sink : plan.getDataSinks()) {
+ traverse(sink);
+ }
+ }
+
+
+ protected void traverse(PlanNode node) {
+ if (!alreadyDone.add(node)) {
+ // already worked on that one
+ return;
+ }
+
+ // distinguish the node types
+ if (node instanceof SinkPlanNode) {
+ // descend to the input channel
+ SinkPlanNode sn = (SinkPlanNode) node;
+ Channel inchannel = sn.getInput();
+ traverseChannel(inchannel);
+ }
+ else if (node instanceof SourcePlanNode) {
+ TypeInformation<?> typeInfo = getTypeInfoFromSource((SourcePlanNode) node);
+ ((SourcePlanNode) node).setSerializer(createSerializer(typeInfo));
+ }
+ else if (node instanceof BulkIterationPlanNode) {
+ BulkIterationPlanNode iterationNode = (BulkIterationPlanNode) node;
+
+ if (iterationNode.getRootOfStepFunction() instanceof NAryUnionPlanNode) {
+ throw new CompilerException("Optimizer cannot compile an iteration step function where next partial solution is created by a Union node.");
+ }
+
+ // traverse the termination criterion for the first time. create schema only, no utilities. Needed in case of intermediate termination criterion
+ if (iterationNode.getRootOfTerminationCriterion() != null) {
+ SingleInputPlanNode addMapper = (SingleInputPlanNode) iterationNode.getRootOfTerminationCriterion();
+ traverseChannel(addMapper.getInput());
+ }
+
+ BulkIterationBase<?> operator = (BulkIterationBase<?>) iterationNode.getProgramOperator();
+
+ // set the serializer
+ iterationNode.setSerializerForIterationChannel(createSerializer(operator.getOperatorInfo().getOutputType()));
+
+ // done, we can now propagate our info down
+ traverseChannel(iterationNode.getInput());
+ traverse(iterationNode.getRootOfStepFunction());
+ }
+ else if (node instanceof WorksetIterationPlanNode) {
+ WorksetIterationPlanNode iterationNode = (WorksetIterationPlanNode) node;
+
+ if (iterationNode.getNextWorkSetPlanNode() instanceof NAryUnionPlanNode) {
+ throw new CompilerException("Optimizer cannot compile a workset iteration step function where the next workset is produced by a Union node.");
+ }
+ if (iterationNode.getSolutionSetDeltaPlanNode() instanceof NAryUnionPlanNode) {
+ throw new CompilerException("Optimizer cannot compile a workset iteration step function where the solution set delta is produced by a Union node.");
+ }
+
+ DeltaIterationBase<?, ?> operator = (DeltaIterationBase<?, ?>) iterationNode.getProgramOperator();
+
+ // set the serializers and comparators for the workset iteration
+ iterationNode.setSolutionSetSerializer(createSerializer(operator.getOperatorInfo().getFirstInputType()));
+ iterationNode.setWorksetSerializer(createSerializer(operator.getOperatorInfo().getSecondInputType()));
+ iterationNode.setSolutionSetComparator(createComparator(operator.getOperatorInfo().getFirstInputType(),
+ iterationNode.getSolutionSetKeyFields(), getSortOrders(iterationNode.getSolutionSetKeyFields(), null)));
+
+ // traverse the inputs
+ traverseChannel(iterationNode.getInput1());
+ traverseChannel(iterationNode.getInput2());
+
+ // traverse the step function
+ traverse(iterationNode.getSolutionSetDeltaPlanNode());
+ traverse(iterationNode.getNextWorkSetPlanNode());
+ }
+ else if (node instanceof SingleInputPlanNode) {
+ SingleInputPlanNode sn = (SingleInputPlanNode) node;
+
+ if (!(sn.getOptimizerNode().getOperator() instanceof SingleInputOperator)) {
+
+ // Special case for delta iterations
+ if(sn.getOptimizerNode().getOperator() instanceof NoOpUnaryUdfOp) {
+ traverseChannel(sn.getInput());
+ return;
+ } else {
+ throw new RuntimeException("Wrong operator type found in post pass.");
+ }
+ }
+
+ SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) sn.getOptimizerNode().getOperator();
+
+ // parameterize the node's driver strategy
+ for(int i=0;i<sn.getDriverStrategy().getNumRequiredComparators();i++) {
+ sn.setComparator(createComparator(singleInputOperator.getOperatorInfo().getInputType(), sn.getKeys(i),
+ getSortOrders(sn.getKeys(i), sn.getSortOrders(i))), i);
+ }
+ // done, we can now propagate our info down
+ traverseChannel(sn.getInput());
+
+ // don't forget the broadcast inputs
+ for (Channel c: sn.getBroadcastInputs()) {
+ traverseChannel(c);
+ }
+ }
+ else if (node instanceof DualInputPlanNode) {
+ DualInputPlanNode dn = (DualInputPlanNode) node;
+
+ if (!(dn.getOptimizerNode().getOperator() instanceof DualInputOperator)) {
+ throw new RuntimeException("Wrong operator type found in post pass.");
+ }
+
+ DualInputOperator<?, ?, ?, ?> dualInputOperator = (DualInputOperator<?, ?, ?, ?>) dn.getOptimizerNode().getOperator();
+
+ // parameterize the node's driver strategy
+ if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
+ dn.setComparator1(createComparator(dualInputOperator.getOperatorInfo().getFirstInputType(), dn.getKeysForInput1(),
+ getSortOrders(dn.getKeysForInput1(), dn.getSortOrders())));
+ dn.setComparator2(createComparator(dualInputOperator.getOperatorInfo().getSecondInputType(), dn.getKeysForInput2(),
+ getSortOrders(dn.getKeysForInput2(), dn.getSortOrders())));
+
+ dn.setPairComparator(createPairComparator(dualInputOperator.getOperatorInfo().getFirstInputType(),
+ dualInputOperator.getOperatorInfo().getSecondInputType()));
+
+ }
+
+ traverseChannel(dn.getInput1());
+ traverseChannel(dn.getInput2());
+
+ // don't forget the broadcast inputs
+ for (Channel c: dn.getBroadcastInputs()) {
+ traverseChannel(c);
+ }
+
+ }
+ // catch the sources of the iterative step functions
+ else if (node instanceof BulkPartialSolutionPlanNode ||
+ node instanceof SolutionSetPlanNode ||
+ node instanceof WorksetPlanNode)
+ {
+ // Do nothing :D
+ }
+ else if (node instanceof NAryUnionPlanNode){
+ // Traverse to all child channels
+ for (Channel channel : node.getInputs()) {
+ traverseChannel(channel);
+ }
+ }
+ else {
+ throw new CompilerPostPassException("Unknown node type encountered: " + node.getClass().getName());
+ }
+ }
+
+ private void traverseChannel(Channel channel) {
+
+ PlanNode source = channel.getSource();
+ Operator<?> javaOp = source.getProgramOperator();
+
+// if (!(javaOp instanceof BulkIteration) && !(javaOp instanceof JavaPlanNode)) {
+// throw new RuntimeException("Wrong operator type found in post pass: " + javaOp);
+// }
+
+ TypeInformation<?> type = javaOp.getOperatorInfo().getOutputType();
+
+
+ if(javaOp instanceof GroupReduceOperatorBase &&
+ (source.getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE || source.getDriverStrategy() == DriverStrategy.ALL_GROUP_REDUCE_COMBINE)) {
+ GroupReduceOperatorBase<?, ?, ?> groupNode = (GroupReduceOperatorBase<?, ?, ?>) javaOp;
+ type = groupNode.getInput().getOperatorInfo().getOutputType();
+ }
+ else if(javaOp instanceof PlanUnwrappingReduceGroupOperator &&
+ source.getDriverStrategy().equals(DriverStrategy.SORTED_GROUP_COMBINE)) {
+ PlanUnwrappingReduceGroupOperator<?, ?, ?> groupNode = (PlanUnwrappingReduceGroupOperator<?, ?, ?>) javaOp;
+ type = groupNode.getInput().getOperatorInfo().getOutputType();
+ }
+
+ // the serializer always exists
+ channel.setSerializer(createSerializer(type));
+
+ // parameterize the ship strategy
+ if (channel.getShipStrategy().requiresComparator()) {
+ channel.setShipStrategyComparator(createComparator(type, channel.getShipStrategyKeys(),
+ getSortOrders(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder())));
+ }
+
+ // parameterize the local strategy
+ if (channel.getLocalStrategy().requiresComparator()) {
+ channel.setLocalStrategyComparator(createComparator(type, channel.getLocalStrategyKeys(),
+ getSortOrders(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder())));
+ }
+
+ // descend to the channel's source
+ traverse(channel.getSource());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private static <T> TypeInformation<T> getTypeInfoFromSource(SourcePlanNode node) {
+ Operator<?> op = node.getOptimizerNode().getOperator();
+
+ if (op instanceof GenericDataSourceBase) {
+ return ((GenericDataSourceBase<T, ?>) op).getOperatorInfo().getOutputType();
+ } else {
+ throw new RuntimeException("Wrong operator type found in post pass.");
+ }
+ }
+
+ private <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) {
+ TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
+
+ return new RuntimeSerializerFactory<T>(serializer, typeInfo.getTypeClass());
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) {
+
+ TypeComparator<T> comparator;
+ if (typeInfo instanceof CompositeType) {
+ comparator = ((CompositeType<T>) typeInfo).createComparator(keys.toArray(), sortOrder, 0, executionConfig);
+ }
+ else if (typeInfo instanceof AtomicType) {
+ // handle grouping of atomic types
+ comparator = ((AtomicType<T>) typeInfo).createComparator(sortOrder[0], executionConfig);
+ }
+ else {
+ throw new RuntimeException("Unrecognized type: " + typeInfo);
+ }
+
+ return new RuntimeComparatorFactory<T>(comparator);
+ }
+
+ private static <T1 extends Tuple, T2 extends Tuple> TypePairComparatorFactory<T1,T2> createPairComparator(TypeInformation<?> typeInfo1, TypeInformation<?> typeInfo2) {
+ if (!(typeInfo1.isTupleType() || typeInfo1 instanceof PojoTypeInfo) && (typeInfo2.isTupleType() || typeInfo2 instanceof PojoTypeInfo)) {
+ throw new RuntimeException("The runtime currently supports only keyed binary operations (such as joins) on tuples and POJO types.");
+ }
+
+// @SuppressWarnings("unchecked")
+// TupleTypeInfo<T1> info1 = (TupleTypeInfo<T1>) typeInfo1;
+// @SuppressWarnings("unchecked")
+// TupleTypeInfo<T2> info2 = (TupleTypeInfo<T2>) typeInfo2;
+
+ return new RuntimePairComparatorFactory<T1,T2>();
+ }
+
+ private static final boolean[] getSortOrders(FieldList keys, boolean[] orders) {
+ if (orders == null) {
+ orders = new boolean[keys.size()];
+ Arrays.fill(orders, true);
+ }
+ return orders;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java
new file mode 100644
index 0000000..b9f6bfa
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/MissingFieldTypeInfoException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.postpass;
+
+public final class MissingFieldTypeInfoException extends Exception {
+
+ private static final long serialVersionUID = 8749941961302509358L;
+
+ private final int fieldNumber;
+
+ public MissingFieldTypeInfoException(int fieldNumber) {
+ this.fieldNumber = fieldNumber;
+ }
+
+ public int getFieldNumber() {
+ return fieldNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java
new file mode 100644
index 0000000..ba0b7c7
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/OptimizerPostPass.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.optimizer.postpass;
+
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+
+/**
+ * Interface for visitors that process the optimizer's plan. Typical post processing applications are schema
+ * finalization or the generation/parameterization of utilities for the actual data model.
+ */
+public interface OptimizerPostPass {
+
+ /**
+ * Central post processing function. Invoked by the optimizer after the best plan has
+ * been determined.
+ *
+ * @param plan The plan to be post processed.
+ */
+ void postPass(OptimizedPlan plan);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
new file mode 100644
index 0000000..1fc4c34
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/PostPassUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.postpass;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.types.Key;
+
+
+public class PostPassUtils {
+
+ public static <X> Class<? extends Key<?>>[] getKeys(AbstractSchema<Class< ? extends X>> schema, int[] fields) throws MissingFieldTypeInfoException {
+ @SuppressWarnings("unchecked")
+ Class<? extends Key<?>>[] keyTypes = new Class[fields.length];
+
+ for (int i = 0; i < fields.length; i++) {
+ Class<? extends X> type = schema.getType(fields[i]);
+ if (type == null) {
+ throw new MissingFieldTypeInfoException(i);
+ } else if (Key.class.isAssignableFrom(type)) {
+ @SuppressWarnings("unchecked")
+ Class<? extends Key<?>> keyType = (Class<? extends Key<?>>) type;
+ keyTypes[i] = keyType;
+ } else {
+ throw new CompilerException("The field type " + type.getName() +
+ " cannot be used as a key because it does not implement the interface 'Key'");
+ }
+ }
+
+ return keyTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java
new file mode 100644
index 0000000..8a2d006
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/RecordModelPostPass.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.postpass;
+
+import org.apache.flink.api.common.operators.DualInputOperator;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.Ordering;
+import org.apache.flink.api.common.operators.RecordOperator;
+import org.apache.flink.api.common.operators.SingleInputOperator;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.CompilerPostPassException;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.types.Key;
+
+/**
+ * Post pass implementation for the Record data model. Does only type inference and creates
+ * serializers and comparators.
+ */
+public class RecordModelPostPass extends GenericFlatTypePostPass<Class<? extends Key<?>>, SparseKeySchema> {
+
+ // --------------------------------------------------------------------------------------------
+ // Type specific methods that extract schema information
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ protected SparseKeySchema createEmptySchema() {
+ return new SparseKeySchema();
+ }
+
+ @Override
+ protected void getSinkSchema(SinkPlanNode sinkPlanNode, SparseKeySchema schema) throws CompilerPostPassException {
+ GenericDataSinkBase<?> sink = sinkPlanNode.getSinkNode().getOperator();
+ Ordering partitioning = sink.getPartitionOrdering();
+ Ordering sorting = sink.getLocalOrder();
+
+ try {
+ if (partitioning != null) {
+ addOrderingToSchema(partitioning, schema);
+ }
+ if (sorting != null) {
+ addOrderingToSchema(sorting, schema);
+ }
+ } catch (ConflictingFieldTypeInfoException ex) {
+ throw new CompilerPostPassException("Conflicting information found when adding data sink types. " +
+ "Probable reason is contradicting type infos for partitioning and sorting ordering.");
+ }
+ }
+
+ @Override
+ protected void getSingleInputNodeSchema(SingleInputPlanNode node, SparseKeySchema schema)
+ throws CompilerPostPassException, ConflictingFieldTypeInfoException
+ {
+ // check that we got the right types
+ SingleInputOperator<?, ?, ?> contract = (SingleInputOperator<?, ?, ?>) node.getSingleInputNode().getOperator();
+ if (! (contract instanceof RecordOperator)) {
+ throw new CompilerPostPassException("Error: Operator is not a Record based contract. Wrong compiler invokation.");
+ }
+ RecordOperator recContract = (RecordOperator) contract;
+
+ // add the information to the schema
+ int[] localPositions = contract.getKeyColumns(0);
+ Class<? extends Key<?>>[] types = recContract.getKeyClasses();
+ for (int i = 0; i < localPositions.length; i++) {
+ schema.addType(localPositions[i], types[i]);
+ }
+
+ // this is a temporary fix, we should solve this more generic
+ if (contract instanceof GroupReduceOperatorBase) {
+ Ordering groupOrder = ((GroupReduceOperatorBase<?, ?, ?>) contract).getGroupOrder();
+ if (groupOrder != null) {
+ addOrderingToSchema(groupOrder, schema);
+ }
+ }
+ }
+
+ @Override
+ protected void getDualInputNodeSchema(DualInputPlanNode node, SparseKeySchema input1Schema, SparseKeySchema input2Schema)
+ throws CompilerPostPassException, ConflictingFieldTypeInfoException
+ {
+ // add the nodes local information. this automatically consistency checks
+ DualInputOperator<?, ?, ?, ?> contract = node.getTwoInputNode().getOperator();
+ if (! (contract instanceof RecordOperator)) {
+ throw new CompilerPostPassException("Error: Operator is not a Pact Record based contract. Wrong compiler invokation.");
+ }
+
+ RecordOperator recContract = (RecordOperator) contract;
+ int[] localPositions1 = contract.getKeyColumns(0);
+ int[] localPositions2 = contract.getKeyColumns(1);
+ Class<? extends Key<?>>[] types = recContract.getKeyClasses();
+
+ if (localPositions1.length != localPositions2.length) {
+ throw new CompilerException("Error: The keys for the first and second input have a different number of fields.");
+ }
+
+ for (int i = 0; i < localPositions1.length; i++) {
+ input1Schema.addType(localPositions1[i], types[i]);
+ }
+ for (int i = 0; i < localPositions2.length; i++) {
+ input2Schema.addType(localPositions2[i], types[i]);
+ }
+
+
+ // this is a temporary fix, we should solve this more generic
+ if (contract instanceof CoGroupOperatorBase) {
+ Ordering groupOrder1 = ((CoGroupOperatorBase<?, ?, ?, ?>) contract).getGroupOrderForInputOne();
+ Ordering groupOrder2 = ((CoGroupOperatorBase<?, ?, ?, ?>) contract).getGroupOrderForInputTwo();
+
+ if (groupOrder1 != null) {
+ addOrderingToSchema(groupOrder1, input1Schema);
+ }
+ if (groupOrder2 != null) {
+ addOrderingToSchema(groupOrder2, input2Schema);
+ }
+ }
+ }
+
+ private void addOrderingToSchema(Ordering o, SparseKeySchema schema) throws ConflictingFieldTypeInfoException {
+ for (int i = 0; i < o.getNumberOfFields(); i++) {
+ Integer pos = o.getFieldNumber(i);
+ Class<? extends Key<?>> type = o.getType(i);
+ schema.addType(pos, type);
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Methods to create serializers and comparators
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ protected TypeSerializerFactory<?> createSerializer(SparseKeySchema schema) {
+ return RecordSerializerFactory.get();
+ }
+
+ @Override
+ protected RecordComparatorFactory createComparator(FieldList fields, boolean[] directions, SparseKeySchema schema)
+ throws MissingFieldTypeInfoException
+ {
+ int[] positions = fields.toArray();
+ Class<? extends Key<?>>[] keyTypes = PostPassUtils.getKeys(schema, positions);
+ return new RecordComparatorFactory(positions, keyTypes, directions);
+ }
+
+ @Override
+ protected RecordPairComparatorFactory createPairComparator(FieldList fields1, FieldList fields2, boolean[] sortDirections,
+ SparseKeySchema schema1, SparseKeySchema schema2)
+ {
+ return RecordPairComparatorFactory.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
new file mode 100644
index 0000000..e14888e
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/postpass/SparseKeySchema.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.postpass;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.types.Key;
+
+/**
+ * Class encapsulating a schema map (int column position -> column type) and a reference counter.
+ */
+public class SparseKeySchema extends AbstractSchema<Class<? extends Key<?>>> {
+
+ private final Map<Integer, Class<? extends Key<?>>> schema;
+
+
+ public SparseKeySchema() {
+ this.schema = new HashMap<Integer, Class<? extends Key<?>>>();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void addType(int key, Class<? extends Key<?>> type) throws ConflictingFieldTypeInfoException {
+ Class<? extends Key<?>> previous = this.schema.put(key, type);
+ if (previous != null && previous != type) {
+ throw new ConflictingFieldTypeInfoException(key, previous, type);
+ }
+ }
+
+ @Override
+ public Class<? extends Key<?>> getType(int field) {
+ return this.schema.get(field);
+ }
+
+ @Override
+ public Iterator<Entry<Integer, Class<? extends Key<?>>>> iterator() {
+ return this.schema.entrySet().iterator();
+ }
+
+ public int getNumTypes() {
+ return this.schema.size();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return this.schema.hashCode() ^ getNumConnectionsThatContributed();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof SparseKeySchema) {
+ SparseKeySchema other = (SparseKeySchema) obj;
+ return this.schema.equals(other.schema) &&
+ this.getNumConnectionsThatContributed() == other.getNumConnectionsThatContributed();
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "<" + getNumConnectionsThatContributed() + "> : " + this.schema.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java
new file mode 100644
index 0000000..bd35b5c
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BinaryUnionReplacer.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.plan.BinaryUnionPlanNode;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.IterationPlanNode;
+import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.util.Visitor;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * A traversal that collects cascading binary unions into a single n-ary
+ * union operator. The exception is, when on of the union inputs is materialized, such as in the
+ * static-code-path-cache in iterations.
+ */
+public class BinaryUnionReplacer implements Visitor<PlanNode> {
+
+ private final Set<PlanNode> seenBefore = new HashSet<PlanNode>();
+
+ @Override
+ public boolean preVisit(PlanNode visitable) {
+ if (this.seenBefore.add(visitable)) {
+ if (visitable instanceof IterationPlanNode) {
+ ((IterationPlanNode) visitable).acceptForStepFunction(this);
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void postVisit(PlanNode visitable) {
+
+ if (visitable instanceof BinaryUnionPlanNode) {
+
+ final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
+ final Channel in1 = unionNode.getInput1();
+ final Channel in2 = unionNode.getInput2();
+
+ if (!unionNode.unionsStaticAndDynamicPath()) {
+
+ // both on static path, or both on dynamic path. we can collapse them
+ NAryUnionPlanNode newUnionNode;
+
+ List<Channel> inputs = new ArrayList<Channel>();
+ collect(in1, inputs);
+ collect(in2, inputs);
+
+ newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs,
+ unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());
+
+ newUnionNode.setParallelism(unionNode.getParallelism());
+
+ for (Channel c : inputs) {
+ c.setTarget(newUnionNode);
+ }
+
+ for (Channel channel : unionNode.getOutgoingChannels()) {
+ channel.swapUnionNodes(newUnionNode);
+ newUnionNode.addOutgoingChannel(channel);
+ }
+ }
+ else {
+ // union between the static and the dynamic path. we need to handle this for now
+ // through a special union operator
+
+ // make sure that the first input is the cached (static) and the second input is the dynamic
+ if (in1.isOnDynamicPath()) {
+ BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode);
+
+ in1.setTarget(newUnionNode);
+ in2.setTarget(newUnionNode);
+
+ for (Channel channel : unionNode.getOutgoingChannels()) {
+ channel.swapUnionNodes(newUnionNode);
+ newUnionNode.addOutgoingChannel(channel);
+ }
+ }
+ }
+ }
+ }
+
+ public void collect(Channel in, List<Channel> inputs) {
+ if (in.getSource() instanceof NAryUnionPlanNode) {
+ // sanity check
+ if (in.getShipStrategy() != ShipStrategyType.FORWARD) {
+ throw new CompilerException("Bug: Plan generation for Unions picked a ship strategy between binary plan operators.");
+ }
+ if (!(in.getLocalStrategy() == null || in.getLocalStrategy() == LocalStrategy.NONE)) {
+ throw new CompilerException("Bug: Plan generation for Unions picked a local strategy between binary plan operators.");
+ }
+
+ inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs());
+ } else {
+ // is not a collapsed union node, so we take the channel directly
+ inputs.add(in);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java
new file mode 100644
index 0000000..4730546
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/BranchesVisitor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ * This traversal of the optimizer DAG computes the information needed to track
+ * branches and joins in the data flow. This is important to support plans
+ * that are not a minimally connected DAG (Such plans are not trees, but at least one node feeds its
+ * output into more than one other node).
+ */
+public final class BranchesVisitor implements Visitor<OptimizerNode> {
+
+ @Override
+ public boolean preVisit(OptimizerNode node) {
+ return node.getOpenBranches() == null;
+ }
+
+ @Override
+ public void postVisit(OptimizerNode node) {
+ if (node instanceof IterationNode) {
+ ((IterationNode) node).acceptForStepFunction(this);
+ }
+
+ node.computeUnclosedBranchStack();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
new file mode 100644
index 0000000..160ef95
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/GraphCreatingVisitor.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.api.common.ExecutionMode;
+import org.apache.flink.api.common.InvalidProgramException;
+import org.apache.flink.api.common.operators.GenericDataSinkBase;
+import org.apache.flink.api.common.operators.GenericDataSourceBase;
+import org.apache.flink.api.common.operators.Operator;
+import org.apache.flink.api.common.operators.Union;
+import org.apache.flink.api.common.operators.base.BulkIterationBase;
+import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
+import org.apache.flink.api.common.operators.base.CrossOperatorBase;
+import org.apache.flink.api.common.operators.base.DeltaIterationBase;
+import org.apache.flink.api.common.operators.base.FilterOperatorBase;
+import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
+import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase;
+import org.apache.flink.api.common.operators.base.MapOperatorBase;
+import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
+import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.dag.BinaryUnionNode;
+import org.apache.flink.optimizer.dag.BulkIterationNode;
+import org.apache.flink.optimizer.dag.BulkPartialSolutionNode;
+import org.apache.flink.optimizer.dag.CoGroupNode;
+import org.apache.flink.optimizer.dag.CollectorMapNode;
+import org.apache.flink.optimizer.dag.CrossNode;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.DataSinkNode;
+import org.apache.flink.optimizer.dag.DataSourceNode;
+import org.apache.flink.optimizer.dag.FilterNode;
+import org.apache.flink.optimizer.dag.FlatMapNode;
+import org.apache.flink.optimizer.dag.GroupCombineNode;
+import org.apache.flink.optimizer.dag.GroupReduceNode;
+import org.apache.flink.optimizer.dag.JoinNode;
+import org.apache.flink.optimizer.dag.MapNode;
+import org.apache.flink.optimizer.dag.MapPartitionNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.optimizer.dag.PartitionNode;
+import org.apache.flink.optimizer.dag.ReduceNode;
+import org.apache.flink.optimizer.dag.SolutionSetNode;
+import org.apache.flink.optimizer.dag.SortPartitionNode;
+import org.apache.flink.optimizer.dag.WorksetIterationNode;
+import org.apache.flink.optimizer.dag.WorksetNode;
+import org.apache.flink.util.Visitor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This traversal creates the optimizer DAG from a program.
+ * It works as a visitor that walks the program's flow in a depth-first fashion, starting from the data sinks.
+ * During the descend, it creates an optimizer node for each operator, respectively data source or -sink.
+ * During the ascend, it connects the nodes to the full graph.
+ */
+public class GraphCreatingVisitor implements Visitor<Operator<?>> {
+
+ private final Map<Operator<?>, OptimizerNode> con2node; // map from the operator objects to their
+ // corresponding optimizer nodes
+
+ private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan
+
+ private final int defaultParallelism; // the default degree of parallelism
+
+ private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation
+
+ private final ExecutionMode defaultDataExchangeMode;
+
+ private final boolean forceDOP;
+
+
+ public GraphCreatingVisitor(int defaultParallelism, ExecutionMode defaultDataExchangeMode) {
+ this(null, false, defaultParallelism, defaultDataExchangeMode, null);
+ }
+
+ private GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int defaultParallelism,
+ ExecutionMode dataExchangeMode, HashMap<Operator<?>, OptimizerNode> closure) {
+ if (closure == null){
+ con2node = new HashMap<Operator<?>, OptimizerNode>();
+ } else {
+ con2node = closure;
+ }
+
+ this.sinks = new ArrayList<DataSinkNode>(2);
+ this.defaultParallelism = defaultParallelism;
+ this.parent = parent;
+ this.defaultDataExchangeMode = dataExchangeMode;
+ this.forceDOP = forceDOP;
+ }
+
+ public List<DataSinkNode> getSinks() {
+ return sinks;
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public boolean preVisit(Operator<?> c) {
+ // check if we have been here before
+ if (this.con2node.containsKey(c)) {
+ return false;
+ }
+
+ final OptimizerNode n;
+
+ // create a node for the operator (or sink or source) if we have not been here before
+ if (c instanceof GenericDataSinkBase) {
+ DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
+ this.sinks.add(dsn);
+ n = dsn;
+ }
+ else if (c instanceof GenericDataSourceBase) {
+ n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
+ }
+ else if (c instanceof MapOperatorBase) {
+ n = new MapNode((MapOperatorBase<?, ?, ?>) c);
+ }
+ else if (c instanceof MapPartitionOperatorBase) {
+ n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
+ }
+ else if (c instanceof org.apache.flink.api.common.operators.base.CollectorMapOperatorBase) {
+ n = new CollectorMapNode((org.apache.flink.api.common.operators.base.CollectorMapOperatorBase<?, ?, ?>) c);
+ }
+ else if (c instanceof FlatMapOperatorBase) {
+ n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
+ }
+ else if (c instanceof FilterOperatorBase) {
+ n = new FilterNode((FilterOperatorBase<?, ?>) c);
+ }
+ else if (c instanceof ReduceOperatorBase) {
+ n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
+ }
+ else if (c instanceof GroupCombineOperatorBase) {
+ n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
+ }
+ else if (c instanceof GroupReduceOperatorBase) {
+ n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
+ }
+ else if (c instanceof JoinOperatorBase) {
+ n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
+ }
+ else if (c instanceof CoGroupOperatorBase) {
+ n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
+ }
+ else if (c instanceof CrossOperatorBase) {
+ n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
+ }
+ else if (c instanceof BulkIterationBase) {
+ n = new BulkIterationNode((BulkIterationBase<?>) c);
+ }
+ else if (c instanceof DeltaIterationBase) {
+ n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
+ }
+ else if (c instanceof Union){
+ n = new BinaryUnionNode((Union<?>) c);
+ }
+ else if (c instanceof PartitionOperatorBase) {
+ n = new PartitionNode((PartitionOperatorBase<?>) c);
+ }
+ else if (c instanceof SortPartitionOperatorBase) {
+ n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
+ }
+ else if (c instanceof BulkIterationBase.PartialSolutionPlaceHolder) {
+ if (this.parent == null) {
+ throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+ }
+
+ final BulkIterationBase.PartialSolutionPlaceHolder<?> holder = (BulkIterationBase.PartialSolutionPlaceHolder<?>) c;
+ final BulkIterationBase<?> enclosingIteration = holder.getContainingBulkIteration();
+ final BulkIterationNode containingIterationNode =
+ (BulkIterationNode) this.parent.con2node.get(enclosingIteration);
+
+ // catch this for the recursive translation of step functions
+ BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
+ p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+ n = p;
+ }
+ else if (c instanceof DeltaIterationBase.WorksetPlaceHolder) {
+ if (this.parent == null) {
+ throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+ }
+
+ final DeltaIterationBase.WorksetPlaceHolder<?> holder = (DeltaIterationBase.WorksetPlaceHolder<?>) c;
+ final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
+ final WorksetIterationNode containingIterationNode =
+ (WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
+
+ // catch this for the recursive translation of step functions
+ WorksetNode p = new WorksetNode(holder, containingIterationNode);
+ p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+ n = p;
+ }
+ else if (c instanceof DeltaIterationBase.SolutionSetPlaceHolder) {
+ if (this.parent == null) {
+ throw new InvalidProgramException("It is currently not supported to create data sinks inside iterations.");
+ }
+
+ final DeltaIterationBase.SolutionSetPlaceHolder<?> holder = (DeltaIterationBase.SolutionSetPlaceHolder<?>) c;
+ final DeltaIterationBase<?, ?> enclosingIteration = holder.getContainingWorksetIteration();
+ final WorksetIterationNode containingIterationNode =
+ (WorksetIterationNode) this.parent.con2node.get(enclosingIteration);
+
+ // catch this for the recursive translation of step functions
+ SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
+ p.setDegreeOfParallelism(containingIterationNode.getParallelism());
+ n = p;
+ }
+ else {
+ throw new IllegalArgumentException("Unknown operator type: " + c);
+ }
+
+ this.con2node.put(c, n);
+
+ // set the parallelism only if it has not been set before. some nodes have a fixed DOP, such as the
+ // key-less reducer (all-reduce)
+ if (n.getParallelism() < 1) {
+ // set the degree of parallelism
+ int par = c.getDegreeOfParallelism();
+ if (par > 0) {
+ if (this.forceDOP && par != this.defaultParallelism) {
+ par = this.defaultParallelism;
+ Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
+ "currently fixed to the parallelism of the surrounding operator (the iteration).");
+ }
+ } else {
+ par = this.defaultParallelism;
+ }
+ n.setDegreeOfParallelism(par);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void postVisit(Operator<?> c) {
+
+ OptimizerNode n = this.con2node.get(c);
+
+ // first connect to the predecessors
+ n.setInput(this.con2node, this.defaultDataExchangeMode);
+ n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);
+
+ // if the node represents a bulk iteration, we recursively translate the data flow now
+ if (n instanceof BulkIterationNode) {
+ final BulkIterationNode iterNode = (BulkIterationNode) n;
+ final BulkIterationBase<?> iter = iterNode.getIterationContract();
+
+ // pass a copy of the no iterative part into the iteration translation,
+ // in case the iteration references its closure
+ HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
+
+ // first, recursively build the data flow for the step function
+ final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
+ iterNode.getParallelism(), defaultDataExchangeMode, closure);
+
+ BulkPartialSolutionNode partialSolution;
+
+ iter.getNextPartialSolution().accept(recursiveCreator);
+
+ partialSolution = (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
+ OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
+ if (partialSolution == null) {
+ throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
+ }
+
+
+ OptimizerNode terminationCriterion = null;
+
+ if (iter.getTerminationCriterion() != null) {
+ terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
+
+ // no intermediate node yet, traverse from the termination criterion to build the missing parts
+ if (terminationCriterion == null) {
+ iter.getTerminationCriterion().accept(recursiveCreator);
+ terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
+ }
+ }
+
+ iterNode.setPartialSolution(partialSolution);
+ iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
+
+ // go over the contained data flow and mark the dynamic path nodes
+ StaticDynamicPathIdentifier identifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
+ iterNode.acceptForStepFunction(identifier);
+ }
+ else if (n instanceof WorksetIterationNode) {
+ final WorksetIterationNode iterNode = (WorksetIterationNode) n;
+ final DeltaIterationBase<?, ?> iter = iterNode.getIterationContract();
+
+ // we need to ensure that both the next-workset and the solution-set-delta depend on the workset.
+ // One check is for free during the translation, we do the other check here as a pre-condition
+ {
+ StepFunctionValidator wsf = new StepFunctionValidator();
+ iter.getNextWorkset().accept(wsf);
+ if (!wsf.hasFoundWorkset()) {
+ throw new CompilerException("In the given program, the next workset does not depend on the workset. " +
+ "This is a prerequisite in delta iterations.");
+ }
+ }
+
+ // calculate the closure of the anonymous function
+ HashMap<Operator<?>, OptimizerNode> closure = new HashMap<Operator<?>, OptimizerNode>(con2node);
+
+ // first, recursively build the data flow for the step function
+ final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(
+ this, true, iterNode.getParallelism(), defaultDataExchangeMode, closure);
+
+ // descend from the solution set delta. check that it depends on both the workset
+ // and the solution set. If it does depend on both, this descend should create both nodes
+ iter.getSolutionSetDelta().accept(recursiveCreator);
+
+ final WorksetNode worksetNode = (WorksetNode) recursiveCreator.con2node.get(iter.getWorkset());
+
+ if (worksetNode == null) {
+ throw new CompilerException("In the given program, the solution set delta does not depend on the workset." +
+ "This is a prerequisite in delta iterations.");
+ }
+
+ iter.getNextWorkset().accept(recursiveCreator);
+
+ SolutionSetNode solutionSetNode = (SolutionSetNode) recursiveCreator.con2node.get(iter.getSolutionSet());
+
+ if (solutionSetNode == null || solutionSetNode.getOutgoingConnections() == null || solutionSetNode.getOutgoingConnections().isEmpty()) {
+ solutionSetNode = new SolutionSetNode((DeltaIterationBase.SolutionSetPlaceHolder<?>) iter.getSolutionSet(), iterNode);
+ }
+ else {
+ for (DagConnection conn : solutionSetNode.getOutgoingConnections()) {
+ OptimizerNode successor = conn.getTarget();
+
+ if (successor.getClass() == JoinNode.class) {
+ // find out which input to the match the solution set is
+ JoinNode mn = (JoinNode) successor;
+ if (mn.getFirstPredecessorNode() == solutionSetNode) {
+ mn.makeJoinWithSolutionSet(0);
+ } else if (mn.getSecondPredecessorNode() == solutionSetNode) {
+ mn.makeJoinWithSolutionSet(1);
+ } else {
+ throw new CompilerException();
+ }
+ }
+ else if (successor.getClass() == CoGroupNode.class) {
+ CoGroupNode cg = (CoGroupNode) successor;
+ if (cg.getFirstPredecessorNode() == solutionSetNode) {
+ cg.makeCoGroupWithSolutionSet(0);
+ } else if (cg.getSecondPredecessorNode() == solutionSetNode) {
+ cg.makeCoGroupWithSolutionSet(1);
+ } else {
+ throw new CompilerException();
+ }
+ }
+ else {
+ throw new InvalidProgramException(
+ "Error: The only operations allowed on the solution set are Join and CoGroup.");
+ }
+ }
+ }
+
+ final OptimizerNode nextWorksetNode = recursiveCreator.con2node.get(iter.getNextWorkset());
+ final OptimizerNode solutionSetDeltaNode = recursiveCreator.con2node.get(iter.getSolutionSetDelta());
+
+ // set the step function nodes to the iteration node
+ iterNode.setPartialSolution(solutionSetNode, worksetNode);
+ iterNode.setNextPartialSolution(solutionSetDeltaNode, nextWorksetNode, defaultDataExchangeMode);
+
+ // go over the contained data flow and mark the dynamic path nodes
+ StaticDynamicPathIdentifier pathIdentifier = new StaticDynamicPathIdentifier(iterNode.getCostWeight());
+ iterNode.acceptForStepFunction(pathIdentifier);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/633b0d6a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java
new file mode 100644
index 0000000..b5c09e5
--- /dev/null
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/traversals/IdAndEstimatesVisitor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.optimizer.traversals;
+
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.dag.DagConnection;
+import org.apache.flink.optimizer.dag.IterationNode;
+import org.apache.flink.optimizer.dag.OptimizerNode;
+import org.apache.flink.util.Visitor;
+
+/**
+ * This traversal of the optimizer DAG assigns IDs to each node (in a pre-order fashion),
+ * and calls each node to compute its estimates. The latter happens in the postVisit function,
+ * where it is guaranteed that all predecessors have computed their estimates.
+ */
+public class IdAndEstimatesVisitor implements Visitor<OptimizerNode> {
+
+ private final DataStatistics statistics;
+
+ private int id = 1;
+
+ public IdAndEstimatesVisitor(DataStatistics statistics) {
+ this.statistics = statistics;
+ }
+
+ @Override
+ public boolean preVisit(OptimizerNode visitable) {
+ return visitable.getId() == -1;
+ }
+
+ @Override
+ public void postVisit(OptimizerNode visitable) {
+ // the node ids
+ visitable.initId(this.id++);
+
+ // connections need to figure out their maximum path depths
+ for (DagConnection conn : visitable.getIncomingConnections()) {
+ conn.initMaxDepth();
+ }
+ for (DagConnection conn : visitable.getBroadcastConnections()) {
+ conn.initMaxDepth();
+ }
+
+ // the estimates
+ visitable.computeOutputEstimates(this.statistics);
+
+ // if required, recurse into the step function
+ if (visitable instanceof IterationNode) {
+ ((IterationNode) visitable).acceptForStepFunction(this);
+ }
+ }
+}