You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:53 UTC
[37/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
deleted file mode 100644
index 743ee4a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-
-public class JSONGenerator {
-
- public static final String STEPS = "step_function";
- public static final String ID = "id";
- public static final String SIDE = "side";
- public static final String SHIP_STRATEGY = "ship_strategy";
- public static final String PREDECESSORS = "predecessors";
- public static final String TYPE = "type";
- public static final String PACT = "pact";
- public static final String CONTENTS = "contents";
- public static final String PARALLELISM = "parallelism";
-
- private StreamGraph streamGraph;
-
- public JSONGenerator(StreamGraph streamGraph) {
- this.streamGraph = streamGraph;
- }
-
- public String getJSON() throws JSONException {
- JSONObject json = new JSONObject();
- JSONArray nodes = new JSONArray();
- json.put("nodes", nodes);
- List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
- Collections.sort(operatorIDs, new Comparator<Integer>() {
- @Override
- public int compare(Integer o1, Integer o2) {
- // put sinks at the back
- if (streamGraph.getSinkIDs().contains(o1)) {
- return 1;
- } else if (streamGraph.getSinkIDs().contains(o2)) {
- return -1;
- } else {
- return o1 - o2;
- }
- }
- });
- visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
- return json.toString();
- }
-
- private void visit(JSONArray jsonArray, List<Integer> toVisit,
- Map<Integer, Integer> edgeRemapings) throws JSONException {
-
- Integer vertexID = toVisit.get(0);
- StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
- if (streamGraph.getSourceIDs().contains(vertexID)
- || Collections.disjoint(vertex.getInEdges(), toVisit)) {
-
- JSONObject node = new JSONObject();
- decorateNode(vertexID, node);
-
- if (!streamGraph.getSourceIDs().contains(vertexID)) {
- JSONArray inputs = new JSONArray();
- node.put(PREDECESSORS, inputs);
-
- for (StreamEdge inEdge : vertex.getInEdges()) {
- int inputID = inEdge.getSourceId();
-
- Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ? edgeRemapings
- .get(inputID) : inputID;
- decorateEdge(inputs, vertexID, mappedID, inputID);
- }
- }
- jsonArray.put(node);
- toVisit.remove(vertexID);
- } else {
- Integer iterationHead = -1;
- for (StreamEdge inEdge : vertex.getInEdges()) {
- int operator = inEdge.getSourceId();
-
- if (streamGraph.vertexIDtoLoopTimeout.containsKey(operator)) {
- iterationHead = operator;
- }
- }
-
- JSONObject obj = new JSONObject();
- JSONArray iterationSteps = new JSONArray();
- obj.put(STEPS, iterationSteps);
- obj.put(ID, iterationHead);
- obj.put(PACT, "IterativeDataStream");
- obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
- obj.put(CONTENTS, "Stream Iteration");
- JSONArray iterationInputs = new JSONArray();
- obj.put(PREDECESSORS, iterationInputs);
- toVisit.remove(iterationHead);
- visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
- jsonArray.put(obj);
- }
-
- if (!toVisit.isEmpty()) {
- visit(jsonArray, toVisit, edgeRemapings);
- }
- }
-
- private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
- Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {
-
- Integer vertexID = toVisit.get(0);
- StreamNode vertex = streamGraph.getStreamNode(vertexID);
- toVisit.remove(vertexID);
-
- // Ignoring head and tail to avoid redundancy
- if (!streamGraph.vertexIDtoLoopTimeout.containsKey(vertexID)) {
- JSONObject obj = new JSONObject();
- jsonArray.put(obj);
- decorateNode(vertexID, obj);
- JSONArray inEdges = new JSONArray();
- obj.put(PREDECESSORS, inEdges);
-
- for (StreamEdge inEdge : vertex.getInEdges()) {
- int inputID = inEdge.getSourceId();
-
- if (edgeRemapings.keySet().contains(inputID)) {
- decorateEdge(inEdges, vertexID, inputID, inputID);
- } else if (!streamGraph.vertexIDtoLoopTimeout.containsKey(inputID)) {
- decorateEdge(iterationInEdges, vertexID, inputID, inputID);
- }
- }
-
- edgeRemapings.put(vertexID, headId);
- visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
- }
-
- }
-
- private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID, int inputID)
- throws JSONException {
- JSONObject input = new JSONObject();
- inputArray.put(input);
- input.put(ID, mappedInputID);
- input.put(SHIP_STRATEGY, streamGraph.getStreamEdge(inputID, vertexID).getPartitioner());
- input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
- }
-
- private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
-
- StreamNode vertex = streamGraph.getStreamNode(vertexID);
-
- node.put(ID, vertexID);
- node.put(TYPE, vertex.getOperatorName());
-
- if (streamGraph.getSourceIDs().contains(vertexID)) {
- node.put(PACT, "Data Source");
- } else if (streamGraph.getSinkIDs().contains(vertexID)) {
- node.put(PACT, "Data Sink");
- } else {
- node.put(PACT, "Operator");
- }
-
- StreamOperator<?> operator = streamGraph.getStreamNode(vertexID).getOperator();
-
- node.put(CONTENTS, vertex.getOperatorName());
-
- node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
deleted file mode 100644
index 11bf84f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.util.ClassLoaderUtil;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
-import org.apache.flink.util.InstantiationUtil;
-
-public class StreamConfig implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- // ------------------------------------------------------------------------
- // Config Keys
- // ------------------------------------------------------------------------
-
- private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
- private static final String NUMBER_OF_INPUTS = "numberOfInputs";
- private static final String CHAINED_OUTPUTS = "chainedOutputs";
- private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
- private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
- private static final String VERTEX_NAME = "vertexID";
- private static final String ITERATION_ID = "iterationId";
- private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
- private static final String SERIALIZEDUDF = "serializedUDF";
- private static final String USER_FUNCTION = "userFunction";
- private static final String BUFFER_TIMEOUT = "bufferTimeout";
- private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
- private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
- private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out";
- private static final String ITERATON_WAIT = "iterationWait";
- private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
- private static final String EDGES_IN_ORDER = "edgesInOrder";
- private static final String OUT_STREAM_EDGES = "outStreamEdges";
- private static final String IN_STREAM_EDGES = "inStreamEdges";
-
- private static final String CHECKPOINTING_ENABLED = "checkpointing";
- private static final String CHECKPOINT_MODE = "checkpointMode";
-
- private static final String STATE_BACKEND = "statebackend";
- private static final String STATE_PARTITIONER = "statePartitioner";
- private static final String STATE_KEY_SERIALIZER = "statekeyser";
-
-
- // ------------------------------------------------------------------------
- // Default Values
- // ------------------------------------------------------------------------
-
- private static final long DEFAULT_TIMEOUT = 100;
- private static final CheckpointingMode DEFAULT_CHECKPOINTING_MODE = CheckpointingMode.EXACTLY_ONCE;
-
-
- // ------------------------------------------------------------------------
- // Config
- // ------------------------------------------------------------------------
-
- private final Configuration config;
-
- public StreamConfig(Configuration config) {
- this.config = config;
- }
-
- public Configuration getConfiguration() {
- return config;
- }
-
- // ------------------------------------------------------------------------
- // Configured Properties
- // ------------------------------------------------------------------------
-
- public void setVertexID(Integer vertexID) {
- config.setInteger(VERTEX_NAME, vertexID);
- }
-
- public Integer getVertexID() {
- return config.getInteger(VERTEX_NAME, -1);
- }
-
- public void setTypeSerializerIn1(TypeSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
- }
-
- public void setTypeSerializerIn2(TypeSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
- }
-
- public void setTypeSerializerOut(TypeSerializer<?> serializer) {
- setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
- }
-
- public <T> TypeSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_1, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate serializer.", e);
- }
- }
-
- public <T> TypeSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_IN_2, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate serializer.", e);
- }
- }
-
- public <T> TypeSerializer<T> getTypeSerializerOut(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, TYPE_SERIALIZER_OUT_1, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate serializer.", e);
- }
- }
-
- private void setTypeSerializer(String key, TypeSerializer<?> typeWrapper) {
- try {
- InstantiationUtil.writeObjectToConfig(typeWrapper, this.config, key);
- } catch (IOException e) {
- throw new StreamTaskException("Could not serialize type serializer.", e);
- }
- }
-
- public void setBufferTimeout(long timeout) {
- config.setLong(BUFFER_TIMEOUT, timeout);
- }
-
- public long getBufferTimeout() {
- return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
- }
-
- public void setStreamOperator(StreamOperator<?> operator) {
- if (operator != null) {
- config.setClass(USER_FUNCTION, operator.getClass());
-
- try {
- InstantiationUtil.writeObjectToConfig(operator, this.config, SERIALIZEDUDF);
- } catch (IOException e) {
- throw new StreamTaskException("Cannot serialize operator object "
- + operator.getClass() + ".", e);
- }
- }
- }
-
- public <T> T getStreamOperator(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
- }
- catch (ClassNotFoundException e) {
- String classLoaderInfo = ClassLoaderUtil.getUserCodeClassLoaderInfo(cl);
- boolean loadableDoubleCheck = ClassLoaderUtil.validateClassLoadable(e, cl);
-
- String exceptionMessage = "Cannot load user class: " + e.getMessage()
- + "\nClassLoader info: " + classLoaderInfo +
- (loadableDoubleCheck ?
- "\nClass was actually found in classloader - deserialization issue." :
- "\nClass not resolvable through given classloader.");
-
- throw new StreamTaskException(exceptionMessage);
- }
- catch (Exception e) {
- throw new StreamTaskException("Cannot instantiate user function.", e);
- }
- }
-
- public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
- try {
- InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, OUTPUT_SELECTOR_WRAPPER);
- } catch (IOException e) {
- throw new StreamTaskException("Cannot serialize OutputSelectorWrapper.", e);
- }
- }
-
- public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper.", e);
- }
- }
-
- public void setIterationId(String iterationId) {
- config.setString(ITERATION_ID, iterationId);
- }
-
- public String getIterationId() {
- return config.getString(ITERATION_ID, "");
- }
-
- public void setIterationWaitTime(long time) {
- config.setLong(ITERATON_WAIT, time);
- }
-
- public long getIterationWaitTime() {
- return config.getLong(ITERATON_WAIT, 0);
- }
-
- public void setNumberOfInputs(int numberOfInputs) {
- config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
- }
-
- public int getNumberOfInputs() {
- return config.getInteger(NUMBER_OF_INPUTS, 0);
- }
-
- public void setNumberOfOutputs(int numberOfOutputs) {
- config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
- }
-
- public int getNumberOfOutputs() {
- return config.getInteger(NUMBER_OF_OUTPUTS, 0);
- }
-
- public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
- try {
- InstantiationUtil.writeObjectToConfig(outputvertexIDs, this.config, NONCHAINED_OUTPUTS);
- } catch (IOException e) {
- throw new StreamTaskException("Cannot serialize non chained outputs.", e);
- }
- }
-
- public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
- try {
- List<StreamEdge> nonChainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
- return nonChainedOutputs == null ? new ArrayList<StreamEdge>() : nonChainedOutputs;
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate non chained outputs.", e);
- }
- }
-
- public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
- try {
- InstantiationUtil.writeObjectToConfig(chainedOutputs, this.config, CHAINED_OUTPUTS);
- } catch (IOException e) {
- throw new StreamTaskException("Cannot serialize chained outputs.", e);
- }
- }
-
- public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
- try {
- List<StreamEdge> chainedOutputs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl);
- return chainedOutputs == null ? new ArrayList<StreamEdge>() : chainedOutputs;
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate chained outputs.", e);
- }
- }
-
- public void setOutEdges(List<StreamEdge> outEdges) {
- try {
- InstantiationUtil.writeObjectToConfig(outEdges, this.config, OUT_STREAM_EDGES);
- } catch (IOException e) {
- throw new StreamTaskException("Cannot serialize outward edges.", e);
- }
- }
-
- public List<StreamEdge> getOutEdges(ClassLoader cl) {
- try {
- List<StreamEdge> outEdges = InstantiationUtil.readObjectFromConfig(this.config, OUT_STREAM_EDGES, cl);
- return outEdges == null ? new ArrayList<StreamEdge>() : outEdges;
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate outputs.", e);
- }
- }
-
- public void setInPhysicalEdges(List<StreamEdge> inEdges) {
- try {
- InstantiationUtil.writeObjectToConfig(inEdges, this.config, IN_STREAM_EDGES);
- } catch (IOException e) {
- throw new StreamTaskException("Cannot serialize inward edges.", e);
- }
- }
-
- public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
- try {
- List<StreamEdge> inEdges = InstantiationUtil.readObjectFromConfig(this.config, IN_STREAM_EDGES, cl);
- return inEdges == null ? new ArrayList<StreamEdge>() : inEdges;
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate inputs.", e);
- }
- }
-
- // --------------------- checkpointing -----------------------
-
- public void setCheckpointingEnabled(boolean enabled) {
- config.setBoolean(CHECKPOINTING_ENABLED, enabled);
- }
-
- public boolean isCheckpointingEnabled() {
- return config.getBoolean(CHECKPOINTING_ENABLED, false);
- }
-
- public void setCheckpointMode(CheckpointingMode mode) {
- config.setInteger(CHECKPOINT_MODE, mode.ordinal());
- }
-
- public CheckpointingMode getCheckpointMode() {
- int ordinal = config.getInteger(CHECKPOINT_MODE, -1);
- if (ordinal >= 0) {
- return CheckpointingMode.values()[ordinal];
- } else {
- return DEFAULT_CHECKPOINTING_MODE;
- }
- }
-
-
- public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
- try {
- InstantiationUtil.writeObjectToConfig(outEdgeList, this.config, EDGES_IN_ORDER);
- } catch (IOException e) {
- throw new StreamTaskException("Could not serialize outputs in order.", e);
- }
- }
-
- public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
- try {
- List<StreamEdge> outEdgesInOrder = InstantiationUtil.readObjectFromConfig(this.config, EDGES_IN_ORDER, cl);
- return outEdgesInOrder == null ? new ArrayList<StreamEdge>() : outEdgesInOrder;
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate outputs in order.", e);
- }
- }
-
- public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
-
- try {
- InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG);
- } catch (IOException e) {
- throw new StreamTaskException("Could not serialize configuration.", e);
- }
- }
-
- public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
- try {
- Map<Integer, StreamConfig> confs = InstantiationUtil.readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
- return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate configuration.", e);
- }
- }
-
- // ------------------------------------------------------------------------
- // State backend
- // ------------------------------------------------------------------------
-
- public void setStateBackend(StateBackend<?> backend) {
- try {
- InstantiationUtil.writeObjectToConfig(backend, this.config, STATE_BACKEND);
- } catch (Exception e) {
- throw new StreamTaskException("Could not serialize stateHandle provider.", e);
- }
- }
-
- public StateBackend<?> getStateBackend(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, STATE_BACKEND, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate statehandle provider.", e);
- }
- }
-
- public void setStatePartitioner(KeySelector<?, ?> partitioner) {
- try {
- InstantiationUtil.writeObjectToConfig(partitioner, this.config, STATE_PARTITIONER);
- } catch (IOException e) {
- throw new StreamTaskException("Could not serialize state partitioner.", e);
- }
- }
-
- public KeySelector<?, Serializable> getStatePartitioner(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate state partitioner.", e);
- }
- }
-
- public void setStateKeySerializer(TypeSerializer<?> serializer) {
- try {
- InstantiationUtil.writeObjectToConfig(serializer, this.config, STATE_KEY_SERIALIZER);
- } catch (IOException e) {
- throw new StreamTaskException("Could not serialize state key serializer.", e);
- }
- }
-
- public <K> TypeSerializer<K> getStateKeySerializer(ClassLoader cl) {
- try {
- return InstantiationUtil.readObjectFromConfig(this.config, STATE_KEY_SERIALIZER, cl);
- } catch (Exception e) {
- throw new StreamTaskException("Could not instantiate state key serializer from task config.", e);
- }
- }
-
- // ------------------------------------------------------------------------
- // Miscellansous
- // ------------------------------------------------------------------------
-
- public void setChainStart() {
- config.setBoolean(IS_CHAINED_VERTEX, true);
- }
-
- public boolean isChainStart() {
- return config.getBoolean(IS_CHAINED_VERTEX, false);
- }
-
- @Override
- public String toString() {
-
- ClassLoader cl = getClass().getClassLoader();
-
- StringBuilder builder = new StringBuilder();
- builder.append("\n=======================");
- builder.append("Stream Config");
- builder.append("=======================");
- builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
- builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
- builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
- builder.append("\nPartitioning:");
- for (StreamEdge output : getNonChainedOutputs(cl)) {
- int outputname = output.getTargetId();
- builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
- }
-
- builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));
-
- try {
- builder.append("\nOperator: ").append(getStreamOperator(cl).getClass().getSimpleName());
- }
- catch (Exception e) {
- builder.append("\nOperator: Missing");
- }
- builder.append("\nBuffer timeout: ").append(getBufferTimeout());
- builder.append("\nState Monitoring: ").append(isCheckpointingEnabled());
- if (isChainStart() && getChainedOutputs(cl).size() > 0) {
- builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
- builder.append(getTransitiveChainedTaskConfigs(cl));
- }
-
- return builder.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
deleted file mode 100644
index c252870..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-
-/**
- * An edge in the streaming topology. One edge like this does not necessarily
- * gets converted to a connection between two job vertices (due to
- * chaining/optimization).
- */
-public class StreamEdge implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- final private String edgeId;
-
- final private StreamNode sourceVertex;
- final private StreamNode targetVertex;
-
- /**
- * The type number of the input for co-tasks.
- */
- final private int typeNumber;
-
- /**
- * A list of output names that the target vertex listens to (if there is
- * output selection).
- */
- private final List<String> selectedNames;
- private StreamPartitioner<?> outputPartitioner;
-
- public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
- List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
- this.sourceVertex = sourceVertex;
- this.targetVertex = targetVertex;
- this.typeNumber = typeNumber;
- this.selectedNames = selectedNames;
- this.outputPartitioner = outputPartitioner;
-
- this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
- + "_" + outputPartitioner;
- }
-
- public StreamNode getSourceVertex() {
- return sourceVertex;
- }
-
- public StreamNode getTargetVertex() {
- return targetVertex;
- }
-
- public int getSourceId() {
- return sourceVertex.getId();
- }
-
- public int getTargetId() {
- return targetVertex.getId();
- }
-
- public int getTypeNumber() {
- return typeNumber;
- }
-
- public List<String> getSelectedNames() {
- return selectedNames;
- }
-
- public StreamPartitioner<?> getPartitioner() {
- return outputPartitioner;
- }
-
- public void setPartitioner(StreamPartitioner<?> partitioner) {
- this.outputPartitioner = partitioner;
- }
-
- @Override
- public int hashCode() {
- return edgeId.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StreamEdge that = (StreamEdge) o;
-
- return edgeId.equals(that.edgeId);
- }
-
- @Override
- public String toString() {
- return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
- + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
- + ')';
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
deleted file mode 100644
index be020d7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
-import org.apache.flink.api.java.typeutils.MissingTypeInfo;
-import org.apache.flink.optimizer.plan.StreamingPlan;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
-import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
-import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
-import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
-import org.apache.sling.commons.json.JSONException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Class representing the streaming topology. It contains all the information
- * necessary to build the jobgraph for the execution.
- *
- */
-public class StreamGraph extends StreamingPlan {
-
- /** The default interval for checkpoints, in milliseconds */
- public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
-
- private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
-
- private final StreamExecutionEnvironment environemnt;
- private final ExecutionConfig executionConfig;
-
- private CheckpointingMode checkpointingMode;
- private boolean checkpointingEnabled = false;
- private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
- private boolean chaining = true;
-
- private Map<Integer, StreamNode> streamNodes;
- private Set<Integer> sources;
- private Set<Integer> sinks;
- private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
- private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtuaPartitionNodes;
-
- protected Map<Integer, String> vertexIDtoBrokerID;
- protected Map<Integer, Long> vertexIDtoLoopTimeout;
- private StateBackend<?> stateBackend;
- private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
-
- private boolean forceCheckpoint = false;
-
- public StreamGraph(StreamExecutionEnvironment environment) {
-
- this.environemnt = environment;
- executionConfig = environment.getConfig();
-
- // create an empty new stream graph.
- clear();
- }
-
- /**
- * Remove all registered nodes etc.
- */
- public void clear() {
- streamNodes = Maps.newHashMap();
- virtualSelectNodes = Maps.newHashMap();
- virtuaPartitionNodes = Maps.newHashMap();
- vertexIDtoBrokerID = Maps.newHashMap();
- vertexIDtoLoopTimeout = Maps.newHashMap();
- iterationSourceSinkPairs = Sets.newHashSet();
- sources = Sets.newHashSet();
- sinks = Sets.newHashSet();
- }
-
- protected ExecutionConfig getExecutionConfig() {
- return executionConfig;
- }
-
- public void setJobName(String jobName) {
- this.jobName = jobName;
- }
-
- public void setChaining(boolean chaining) {
- this.chaining = chaining;
- }
-
- public void setCheckpointingEnabled(boolean checkpointingEnabled) {
- this.checkpointingEnabled = checkpointingEnabled;
- }
-
- public void setCheckpointingInterval(long checkpointingInterval) {
- this.checkpointingInterval = checkpointingInterval;
- }
-
- public void forceCheckpoint() {
- this.forceCheckpoint = true;
- }
-
- public void setStateBackend(StateBackend<?> backend) {
- this.stateBackend = backend;
- }
-
- public StateBackend<?> getStateBackend() {
- return this.stateBackend;
- }
-
- public long getCheckpointingInterval() {
- return checkpointingInterval;
- }
-
- // Checkpointing
-
- public boolean isChainingEnabled() {
- return chaining;
- }
-
- public boolean isCheckpointingEnabled() {
- return checkpointingEnabled;
- }
-
- public CheckpointingMode getCheckpointingMode() {
- return checkpointingMode;
- }
-
- public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
- this.checkpointingMode = checkpointingMode;
- }
-
-
- public boolean isIterative() {
- return!vertexIDtoLoopTimeout.isEmpty();
- }
-
- public <IN, OUT> void addSource(Integer vertexID, StreamOperator<OUT> operatorObject,
- TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
- addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
- sources.add(vertexID);
- }
-
- public <IN, OUT> void addSink(Integer vertexID, StreamOperator<OUT> operatorObject,
- TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
- addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
- sinks.add(vertexID);
- }
-
- public <IN, OUT> void addOperator(
- Integer vertexID,
- StreamOperator<OUT> operatorObject,
- TypeInformation<IN> inTypeInfo,
- TypeInformation<OUT> outTypeInfo,
- String operatorName) {
-
- if (operatorObject instanceof StreamSource) {
- addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName);
- } else {
- addNode(vertexID, OneInputStreamTask.class, operatorObject, operatorName);
- }
-
- TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
-
- TypeSerializer<OUT> outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(executionConfig) : null;
-
- setSerializers(vertexID, inSerializer, null, outSerializer);
-
- if (operatorObject instanceof OutputTypeConfigurable) {
- @SuppressWarnings("unchecked")
- OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) operatorObject;
- // sets the output type which must be know at StreamGraph creation time
- outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
- }
-
- if (operatorObject instanceof InputTypeConfigurable) {
- InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable) operatorObject;
- inputTypeConfigurable.setInputType(inTypeInfo, executionConfig);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Vertex: {}", vertexID);
- }
- }
-
- public <IN1, IN2, OUT> void addCoOperator(
- Integer vertexID,
- TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
- TypeInformation<IN1> in1TypeInfo,
- TypeInformation<IN2> in2TypeInfo,
- TypeInformation<OUT> outTypeInfo,
- String operatorName) {
-
- addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName);
-
- TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
- outTypeInfo.createSerializer(executionConfig) : null;
-
- setSerializers(vertexID, in1TypeInfo.createSerializer(executionConfig), in2TypeInfo.createSerializer(executionConfig), outSerializer);
-
- if (taskOperatorObject instanceof OutputTypeConfigurable) {
- @SuppressWarnings("unchecked")
- OutputTypeConfigurable<OUT> outputTypeConfigurable = (OutputTypeConfigurable<OUT>) taskOperatorObject;
- // sets the output type which must be know at StreamGraph creation time
- outputTypeConfigurable.setOutputType(outTypeInfo, executionConfig);
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("CO-TASK: {}", vertexID);
- }
- }
-
- protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
- StreamOperator<?> operatorObject, String operatorName) {
-
- if (streamNodes.containsKey(vertexID)) {
- throw new RuntimeException("Duplicate vertexID " + vertexID);
- }
-
- StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
- new ArrayList<OutputSelector<?>>(), vertexClass);
-
- streamNodes.put(vertexID, vertex);
-
- return vertex;
- }
-
- /**
- * Adds a new virtual node that is used to connect a downstream vertex to only the outputs
- * with the selected names.
- *
- * When adding an edge from the virtual node to a downstream node the connection will be made
- * to the original node, only with the selected names given here.
- *
- * @param originalId ID of the node that should be connected to.
- * @param virtualId ID of the virtual node.
- * @param selectedNames The selected names.
- */
- public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<String> selectedNames) {
-
- if (virtualSelectNodes.containsKey(virtualId)) {
- throw new IllegalStateException("Already has virtual select node with id " + virtualId);
- }
-
- virtualSelectNodes.put(virtualId,
- new Tuple2<Integer, List<String>>(originalId, selectedNames));
- }
-
- /**
- * Adds a new virtual node that is used to connect a downstream vertex to an input with a certain
- * partitioning.
- *
- * When adding an edge from the virtual node to a downstream node the connection will be made
- * to the original node, but with the partitioning given here.
- *
- * @param originalId ID of the node that should be connected to.
- * @param virtualId ID of the virtual node.
- * @param partitioner The partitioner
- */
- public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {
-
- if (virtuaPartitionNodes.containsKey(virtualId)) {
- throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
- }
-
- virtuaPartitionNodes.put(virtualId,
- new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
- }
-
- public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
- addEdgeInternal(upStreamVertexID,
- downStreamVertexID,
- typeNumber,
- null,
- Lists.<String>newArrayList());
-
- }
-
- private void addEdgeInternal(Integer upStreamVertexID,
- Integer downStreamVertexID,
- int typeNumber,
- StreamPartitioner<?> partitioner,
- List<String> outputNames) {
-
-
- if (virtualSelectNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
- if (outputNames.isEmpty()) {
- // selections that happen downstream override earlier selections
- outputNames = virtualSelectNodes.get(virtualId).f1;
- }
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
- } else if (virtuaPartitionNodes.containsKey(upStreamVertexID)) {
- int virtualId = upStreamVertexID;
- upStreamVertexID = virtuaPartitionNodes.get(virtualId).f0;
- if (partitioner == null) {
- partitioner = virtuaPartitionNodes.get(virtualId).f1;
- }
- addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
- } else {
- StreamNode upstreamNode = getStreamNode(upStreamVertexID);
- StreamNode downstreamNode = getStreamNode(downStreamVertexID);
-
- // If no partitioner was specified and the parallelism of upstream and downstream
- // operator matches use forward partitioning, use rebalance otherwise.
- if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
- partitioner = new ForwardPartitioner<Object>();
- } else if (partitioner == null) {
- partitioner = new RebalancePartitioner<Object>();
- }
-
- if (partitioner instanceof ForwardPartitioner) {
- if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
- throw new UnsupportedOperationException("Forward partitioning does not allow " +
- "change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
- ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
- " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
- }
- }
-
- StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
-
- getStreamNode(edge.getSourceId()).addOutEdge(edge);
- getStreamNode(edge.getTargetId()).addInEdge(edge);
- }
- }
-
- public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
- if (virtuaPartitionNodes.containsKey(vertexID)) {
- addOutputSelector(virtuaPartitionNodes.get(vertexID).f0, outputSelector);
- } else if (virtualSelectNodes.containsKey(vertexID)) {
- addOutputSelector(virtualSelectNodes.get(vertexID).f0, outputSelector);
- } else {
- getStreamNode(vertexID).addOutputSelector(outputSelector);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Outputselector set for {}", vertexID);
- }
- }
-
- }
-
- public void setParallelism(Integer vertexID, int parallelism) {
- if (getStreamNode(vertexID) != null) {
- getStreamNode(vertexID).setParallelism(parallelism);
- }
- }
-
- public void setKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
- StreamNode node = getStreamNode(vertexID);
- node.setStatePartitioner(keySelector);
- node.setStateKeySerializer(keySerializer);
- }
-
- public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
- if (getStreamNode(vertexID) != null) {
- getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
- }
- }
-
- public void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
- StreamNode vertex = getStreamNode(vertexID);
- vertex.setSerializerIn1(in1);
- vertex.setSerializerIn2(in2);
- vertex.setSerializerOut(out);
- }
-
- public void setSerializersFrom(Integer from, Integer to) {
- StreamNode fromVertex = getStreamNode(from);
- StreamNode toVertex = getStreamNode(to);
-
- toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
- toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
- }
-
- public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
- getStreamNode(vertexID).setSerializerOut(outType.createSerializer(executionConfig));
- }
-
- public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<OUT> operatorObject) {
- getStreamNode(vertexID).setOperator(operatorObject);
- }
-
- public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
- getStreamNode(vertexID).setInputFormat(inputFormat);
- }
-
- public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
- StreamNode node = getStreamNode(vertexID);
- if (node == null) {
- return;
- }
-
- switch (strategy) {
- case ISOLATE:
- node.isolateSlot();
- break;
- case NEWGROUP:
- node.startNewSlotSharingGroup();
- break;
- default:
- throw new IllegalArgumentException("Unknown resource strategy");
- }
- }
-
- public StreamNode getStreamNode(Integer vertexID) {
- return streamNodes.get(vertexID);
- }
-
- protected Collection<? extends Integer> getVertexIDs() {
- return streamNodes.keySet();
- }
-
- public StreamEdge getStreamEdge(int sourceId, int targetId) {
- Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator();
- while (outIterator.hasNext()) {
- StreamEdge edge = outIterator.next();
-
- if (edge.getTargetId() == targetId) {
- return edge;
- }
- }
-
- throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
- }
-
- public Collection<Integer> getSourceIDs() {
- return sources;
- }
-
-
- public Collection<Integer> getSinkIDs() {
- return sinks;
- }
-
- public Collection<StreamNode> getStreamNodes() {
- return streamNodes.values();
- }
-
- public Set<Tuple2<Integer, StreamOperator<?>>> getOperators() {
- Set<Tuple2<Integer, StreamOperator<?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?>>>();
- for (StreamNode vertex : streamNodes.values()) {
- operatorSet.add(new Tuple2<Integer, StreamOperator<?>>(vertex.getId(), vertex
- .getOperator()));
- }
- return operatorSet;
- }
-
- public String getBrokerID(Integer vertexID) {
- return vertexIDtoBrokerID.get(vertexID);
- }
-
- public long getLoopTimeout(Integer vertexID) {
- return vertexIDtoLoopTimeout.get(vertexID);
- }
-
- public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
-
- StreamNode source = this.addNode(sourceId,
- StreamIterationHead.class,
- null,
- null);
- sources.add(source.getId());
- setParallelism(source.getId(), parallelism);
-
- StreamNode sink = this.addNode(sinkId,
- StreamIterationTail.class,
- null,
- null);
- sinks.add(sink.getId());
- setParallelism(sink.getId(), parallelism);
-
- iterationSourceSinkPairs.add(new Tuple2<StreamNode, StreamNode>(source, sink));
-
- source.setOperatorName("IterationSource-" + loopId);
- sink.setOperatorName("IterationSink-" + loopId);
- this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
- this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
- this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
- this.vertexIDtoLoopTimeout.put(sink.getId(), timeout);
-
- return new Tuple2<StreamNode, StreamNode>(source, sink);
- }
-
- public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
- return iterationSourceSinkPairs;
- }
-
- protected void removeEdge(StreamEdge edge) {
-
- edge.getSourceVertex().getOutEdges().remove(edge);
- edge.getTargetVertex().getInEdges().remove(edge);
-
- }
-
- protected void removeVertex(StreamNode toRemove) {
-
- Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
-
- edgesToRemove.addAll(toRemove.getInEdges());
- edgesToRemove.addAll(toRemove.getOutEdges());
-
- for (StreamEdge edge : edgesToRemove) {
- removeEdge(edge);
- }
- streamNodes.remove(toRemove.getId());
- }
-
- /**
- * Gets the assembled {@link JobGraph} and adds a default name for it.
- */
- public JobGraph getJobGraph() {
- return getJobGraph(jobName);
- }
-
- /**
- * Gets the assembled {@link JobGraph} and adds a user specified name for
- * it.
- *
- * @param jobGraphName
- * name of the jobGraph
- */
- public JobGraph getJobGraph(String jobGraphName) {
- // temporarily forbid checkpointing for iterative jobs
- if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
- throw new UnsupportedOperationException(
- "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
- + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
- + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
- }
-
- setJobName(jobGraphName);
-
- StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
-
- return jobgraphGenerator.createJobGraph(jobGraphName);
- }
-
- @Override
- public String getStreamingPlanAsJSON() {
-
- try {
- return new JSONGenerator(this).getJSON();
- } catch (JSONException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("JSON plan creation failed: {}", e);
- }
- return "";
- }
-
- }
-
- @Override
- public void dumpStreamingPlanAsJSON(File file) throws IOException {
- PrintWriter pw = null;
- try {
- pw = new PrintWriter(new FileOutputStream(file), false);
- pw.write(getStreamingPlanAsJSON());
- pw.flush();
-
- } finally {
- if (pw != null) {
- pw.close();
- }
- }
- }
-
- public static enum ResourceStrategy {
- DEFAULT, ISOLATE, NEWGROUP
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
deleted file mode 100644
index 4a87eb3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.graph;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.FileSourceFunction;
-import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
-import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.SelectTransformation;
-import org.apache.flink.streaming.api.transformations.SinkTransformation;
-import org.apache.flink.streaming.api.transformations.SourceTransformation;
-import org.apache.flink.streaming.api.transformations.SplitTransformation;
-import org.apache.flink.streaming.api.transformations.StreamTransformation;
-import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
-import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A generator that generates a {@link StreamGraph} from a graph of
- * {@link StreamTransformation StreamTransformations}.
- *
- * <p>
- * This traverses the tree of {@code StreamTransformations} starting from the sinks. At each
- * transformation we recursively transform the inputs, then create a node in the {@code StreamGraph}
- * and add edges from the input Nodes to our newly created node. The transformation methods
- * return the IDs of the nodes in the StreamGraph that represent the input transformation. Several
- * IDs can be returned to be able to deal with feedback transformations and unions.
- *
- * <p>
- * Partitioning, split/select and union don't create actual nodes in the {@code StreamGraph}. For
- * these, we create a virtual node in the {@code StreamGraph} that holds the specific property, i.e.
- * partitioning, selector and so on. When an edge is created from a virtual node to a downstream
- * node the {@code StreamGraph} resolved the id of the original node and creates an edge
- * in the graph with the desired property. For example, if you have this graph:
- *
- * <pre>
- * Map-1 -> HashPartition-2 -> Map-3
- * </pre>
- *
- * where the numbers represent transformation IDs. We first recurse all the way down. {@code Map-1}
- * is transformed, i.e. we create a {@code StreamNode} with ID 1. Then we transform the
- * {@code HashPartition}, for this, we create virtual node of ID 4 that holds the property
- * {@code HashPartition}. This transformation returns the ID 4. Then we transform the {@code Map-3}.
- * We add the edge {@code 4 -> 3}. The {@code StreamGraph} resolved the actual node with ID 1 and
- * creates and edge {@code 1 -> 3} with the property HashPartition.
- */
-public class StreamGraphGenerator {
-
- private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
-
- // The StreamGraph that is being built, this is initialized at the beginning.
- private StreamGraph streamGraph;
-
- private final StreamExecutionEnvironment env;
-
- // This is used to assign a unique ID to iteration source/sink
- protected static Integer iterationIdCounter = 0;
- public static int getNewIterationNodeId() {
- iterationIdCounter--;
- return iterationIdCounter;
- }
-
- // Keep track of which Transforms we have already transformed, this is necessary because
- // we have loops, i.e. feedback edges.
- private Map<StreamTransformation<?>, Collection<Integer>> alreadyTransformed;
-
-
- /**
- * Private constructor. The generator should only be invoked using {@link #generate}.
- */
- private StreamGraphGenerator(StreamExecutionEnvironment env) {
- this.streamGraph = new StreamGraph(env);
- this.streamGraph.setChaining(env.isChainingEnabled());
-
- if (env.getCheckpointInterval() > 0) {
- this.streamGraph.setCheckpointingEnabled(true);
- this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
- this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
- }
- this.streamGraph.setStateBackend(env.getStateBackend());
- if (env.isForceCheckpointing()) {
- this.streamGraph.forceCheckpoint();
- }
-
- this.env = env;
- this.alreadyTransformed = new HashMap<>();
- }
-
- /**
- * Generates a {@code StreamGraph} by traversing the graph of {@code StreamTransformations}
- * starting from the given transformations.
- *
- * @param env The {@code StreamExecutionEnvironment} that is used to set some parameters of the
- * job
- * @param transformations The transformations starting from which to transform the graph
- *
- * @return The generated {@code StreamGraph}
- */
- public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
- return new StreamGraphGenerator(env).generateInternal(transformations);
- }
-
- /**
- * This starts the actual transformation, beginning from the sinks.
- */
- private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
- for (StreamTransformation<?> transformation: transformations) {
- transform(transformation);
- }
- return streamGraph;
- }
-
- /**
- * Transforms one {@code StreamTransformation}.
- *
- * <p>
- * This checks whether we already transformed it and exits early in that case. If not it
- * delegates to one of the transformation specific methods.
- */
- private Collection<Integer> transform(StreamTransformation<?> transform) {
-
- if (alreadyTransformed.containsKey(transform)) {
- return alreadyTransformed.get(transform);
- }
-
- LOG.debug("Transforming " + transform);
-
- // call at least once to trigger exceptions about MissingTypeInfo
- transform.getOutputType();
-
- Collection<Integer> transformedIds;
- if (transform instanceof OneInputTransformation<?, ?>) {
- transformedIds = transformOnInputTransform((OneInputTransformation<?, ?>) transform);
- } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
- transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
- } else if (transform instanceof SourceTransformation<?>) {
- transformedIds = transformSource((SourceTransformation<?>) transform);
- } else if (transform instanceof SinkTransformation<?>) {
- transformedIds = transformSink((SinkTransformation<?>) transform);
- } else if (transform instanceof UnionTransformation<?>) {
- transformedIds = transformUnion((UnionTransformation<?>) transform);
- } else if (transform instanceof SplitTransformation<?>) {
- transformedIds = transformSplit((SplitTransformation<?>) transform);
- } else if (transform instanceof SelectTransformation<?>) {
- transformedIds = transformSelect((SelectTransformation<?>) transform);
- } else if (transform instanceof FeedbackTransformation<?>) {
- transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
- } else if (transform instanceof CoFeedbackTransformation<?>) {
- transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
- } else if (transform instanceof PartitionTransformation<?>) {
- transformedIds = transformPartition((PartitionTransformation<?>) transform);
- } else {
- throw new IllegalStateException("Unknown transformation: " + transform);
- }
-
- // need this check because the iterate transformation adds itself before
- // transforming the feedback edges
- if (!alreadyTransformed.containsKey(transform)) {
- alreadyTransformed.put(transform, transformedIds);
- }
-
- if (transform.getBufferTimeout() > 0) {
- streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
- }
- if (transform.getResourceStrategy() != StreamGraph.ResourceStrategy.DEFAULT) {
- streamGraph.setResourceStrategy(transform.getId(), transform.getResourceStrategy());
- }
-
- return transformedIds;
- }
-
- /**
- * Transforms a {@code UnionTransformation}.
- *
- * <p>
- * This is easy, we only have to transform the inputs and return all the IDs in a list so
- * that downstream operations can connect to all upstream nodes.
- */
- private <T> Collection<Integer> transformUnion(UnionTransformation<T> union) {
- List<StreamTransformation<T>> inputs = union.getInputs();
- List<Integer> resultIds = new ArrayList<>();
-
- for (StreamTransformation<T> input: inputs) {
- resultIds.addAll(transform(input));
- }
-
- return resultIds;
- }
-
- /**
- * Transforms a {@code PartitionTransformation}.
- *
- * <p>
- * For this we create a virtual node in the {@code StreamGraph} that holds the partition
- * property. @see StreamGraphGenerator
- */
- private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
- StreamTransformation<T> input = partition.getInput();
- List<Integer> resultIds = new ArrayList<>();
-
- Collection<Integer> transformedIds = transform(input);
- for (Integer transformedId: transformedIds) {
- int virtualId = StreamTransformation.getNewNodeId();
- streamGraph.addVirtualPartitionNode(transformedId, virtualId, partition.getPartitioner());
- resultIds.add(virtualId);
- }
-
- return resultIds;
- }
-
- /**
- * Transforms a {@code SplitTransformation}.
- *
- * <p>
- * We add the output selector to previously transformed nodes.
- */
- private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
-
- StreamTransformation<T> input = split.getInput();
- Collection<Integer> resultIds = transform(input);
-
- // the recursive transform call might have transformed this already
- if (alreadyTransformed.containsKey(split)) {
- return alreadyTransformed.get(split);
- }
-
- for (int inputId : resultIds) {
- streamGraph.addOutputSelector(inputId, split.getOutputSelector());
- }
-
-
- return resultIds;
- }
-
- /**
- * Transforms a {@code SelectTransformation}.
- *
- * <p>
- * For this we create a virtual node in the {@code StreamGraph} holds the selected names.
- * @see org.apache.flink.streaming.api.graph.StreamGraphGenerator
- */
- private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
- StreamTransformation<T> input = select.getInput();
- Collection<Integer> resultIds = transform(input);
-
-
- // the recursive transform might have already transformed this
- if (alreadyTransformed.containsKey(select)) {
- return alreadyTransformed.get(select);
- }
-
- List<Integer> virtualResultIds = new ArrayList<>();
-
- for (int inputId : resultIds) {
- int virtualId = StreamTransformation.getNewNodeId();
- streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
- virtualResultIds.add(virtualId);
- }
- return virtualResultIds;
- }
-
- /**
- * Transforms a {@code FeedbackTransformation}.
- *
- * <p>
- * This will recursively transform the input and the feedback edges. We return the concatenation
- * of the input IDs and the feedback IDs so that downstream operations can be wired to both.
- *
- * <p>
- * This is responsible for creating the IterationSource and IterationSink which
- * are used to feed back the elements.
- */
- private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
-
- if (iterate.getFeedbackEdges().size() <= 0) {
- throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
- }
-
- StreamTransformation<T> input = iterate.getInput();
- List<Integer> resultIds = new ArrayList<>();
-
- // first transform the input stream(s) and store the result IDs
- resultIds.addAll(transform(input));
-
- // the recursive transform might have already transformed this
- if (alreadyTransformed.containsKey(iterate)) {
- return alreadyTransformed.get(iterate);
- }
-
-
- // create the fake iteration source/sink pair
- Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
- iterate.getId(),
- getNewIterationNodeId(),
- getNewIterationNodeId(),
- iterate.getWaitTime(),
- iterate.getParallelism());
-
- StreamNode itSource = itSourceAndSink.f0;
- StreamNode itSink = itSourceAndSink.f1;
-
- // We set the proper serializers for the sink/source
- streamGraph.setSerializers(itSource.getId(), null, null, iterate.getOutputType().createSerializer(env.getConfig()));
- streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(env.getConfig()), null, null);
-
- // also add the feedback source ID to the result IDs, so that downstream operators will
- // add both as input
- resultIds.add(itSource.getId());
-
- // at the iterate to the already-seen-set with the result IDs, so that we can transform
- // the feedback edges and let them stop when encountering the iterate node
- alreadyTransformed.put(iterate, resultIds);
-
- for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
- Collection<Integer> feedbackIds = transform(feedbackEdge);
- for (Integer feedbackId: feedbackIds) {
- streamGraph.addEdge(feedbackId,
- itSink.getId(),
- 0
- );
- }
- }
-
- return resultIds;
- }
-
- /**
- * Transforms a {@code CoFeedbackTransformation}.
- *
- * <p>
- * This will only transform feedback edges, the result of this transform will be wired
- * to the second input of a Co-Transform. The original input is wired directly to the first
- * input of the downstream Co-Transform.
- *
- * <p>
- * This is responsible for creating the IterationSource and IterationSink which
- * are used to feed back the elements.
- */
- private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {
-
- // For Co-Iteration we don't need to transform the input and wire the input to the
- // head operator by returning the input IDs, the input is directly wired to the left
- // input of the co-operation. This transform only needs to return the ids of the feedback
- // edges, since they need to be wired to the second input of the co-operation.
-
- // create the fake iteration source/sink pair
- Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
- coIterate.getId(),
- getNewIterationNodeId(),
- getNewIterationNodeId(),
- coIterate.getWaitTime(),
- coIterate.getParallelism());
-
- StreamNode itSource = itSourceAndSink.f0;
- StreamNode itSink = itSourceAndSink.f1;
-
- // We set the proper serializers for the sink/source
- streamGraph.setSerializers(itSource.getId(), null, null, coIterate.getOutputType().createSerializer(env.getConfig()));
- streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(env.getConfig()), null, null);
-
- Collection<Integer> resultIds = Collections.singleton(itSource.getId());
-
- // at the iterate to the already-seen-set with the result IDs, so that we can transform
- // the feedback edges and let them stop when encountering the iterate node
- alreadyTransformed.put(coIterate, resultIds);
-
- for (StreamTransformation<F> feedbackEdge : coIterate.getFeedbackEdges()) {
- Collection<Integer> feedbackIds = transform(feedbackEdge);
- for (Integer feedbackId: feedbackIds) {
- streamGraph.addEdge(feedbackId,
- itSink.getId(),
- 0
- );
- }
- }
-
- return Collections.singleton(itSource.getId());
- }
-
- /**
- * Transforms a {@code SourceTransformation}.
- */
- private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
- streamGraph.addSource(source.getId(),
- source.getOperator(),
- null,
- source.getOutputType(),
- "Source: " + source.getName());
- if (source.getOperator().getUserFunction() instanceof FileSourceFunction) {
- FileSourceFunction<T> fs = (FileSourceFunction<T>) source.getOperator().getUserFunction();
- streamGraph.setInputFormat(source.getId(), fs.getFormat());
- }
- streamGraph.setParallelism(source.getId(), source.getParallelism());
- return Collections.singleton(source.getId());
- }
-
- /**
- * Transforms a {@code SourceTransformation}.
- */
- private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
-
- Collection<Integer> inputIds = transform(sink.getInput());
-
- streamGraph.addSink(sink.getId(),
- sink.getOperator(),
- sink.getInput().getOutputType(),
- null,
- "Sink: " + sink.getName());
-
- streamGraph.setParallelism(sink.getId(), sink.getParallelism());
-
- for (Integer inputId: inputIds) {
- streamGraph.addEdge(inputId,
- sink.getId(),
- 0
- );
- }
-
-
- if (sink.getStateKeySelector() != null) {
- TypeSerializer<?> keySerializer = sink.getStateKeyType().createSerializer(env.getConfig());
- streamGraph.setKey(sink.getId(), sink.getStateKeySelector(), keySerializer);
- }
-
- return Collections.emptyList();
- }
-
- /**
- * Transforms a {@code OneInputTransformation}.
- *
- * <p>
- * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
- * wired the inputs to this new node.
- */
- private <IN, OUT> Collection<Integer> transformOnInputTransform(OneInputTransformation<IN, OUT> transform) {
-
- Collection<Integer> inputIds = transform(transform.getInput());
-
- // the recursive call might have already transformed this
- if (alreadyTransformed.containsKey(transform)) {
- return alreadyTransformed.get(transform);
- }
-
- streamGraph.addOperator(transform.getId(),
- transform.getOperator(),
- transform.getInputType(),
- transform.getOutputType(),
- transform.getName());
-
- if (transform.getStateKeySelector() != null) {
- TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
- streamGraph.setKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
- }
- if (transform.getStateKeyType() != null) {
-
- }
-
- streamGraph.setParallelism(transform.getId(), transform.getParallelism());
-
- for (Integer inputId: inputIds) {
- streamGraph.addEdge(inputId, transform.getId(), 0);
- }
-
- return Collections.singleton(transform.getId());
- }
-
- /**
- * Transforms a {@code TwoInputTransformation}.
- *
- * <p>
- * This recusively transforms the inputs, creates a new {@code StreamNode} in the graph and
- * wired the inputs to this new node.
- */
- private <IN1, IN2, OUT> Collection<Integer> transformTwoInputTransform(TwoInputTransformation<IN1, IN2, OUT> transform) {
-
- Collection<Integer> inputIds1 = transform(transform.getInput1());
- Collection<Integer> inputIds2 = transform(transform.getInput2());
-
- // the recursive call might have already transformed this
- if (alreadyTransformed.containsKey(transform)) {
- return alreadyTransformed.get(transform);
- }
-
- streamGraph.addCoOperator(
- transform.getId(),
- transform.getOperator(),
- transform.getInputType1(),
- transform.getInputType2(),
- transform.getOutputType(),
- transform.getName());
-
- streamGraph.setParallelism(transform.getId(), transform.getParallelism());
-
- for (Integer inputId: inputIds1) {
- streamGraph.addEdge(inputId,
- transform.getId(),
- 1
- );
- }
-
- for (Integer inputId: inputIds2) {
- streamGraph.addEdge(inputId,
- transform.getId(),
- 2
- );
- }
-
- return Collections.singleton(transform.getId());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
deleted file mode 100644
index 608e648..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.graph;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-
-/**
- * Class representing the operators in the streaming programs, with all their
- * properties.
- *
- */
-public class StreamNode implements Serializable {
-
- private static final long serialVersionUID = 1L;
- private static int currentSlotSharingIndex = 1;
-
- transient private StreamExecutionEnvironment env;
-
- private Integer id;
- private Integer parallelism = null;
- private Long bufferTimeout = null;
- private String operatorName;
- private Integer slotSharingID;
- private boolean isolatedSlot = false;
- private KeySelector<?,?> statePartitioner;
- private TypeSerializer<?> stateKeySerializer;
-
- private transient StreamOperator<?> operator;
- private List<OutputSelector<?>> outputSelectors;
- private TypeSerializer<?> typeSerializerIn1;
- private TypeSerializer<?> typeSerializerIn2;
- private TypeSerializer<?> typeSerializerOut;
-
- private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
- private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
-
- private Class<? extends AbstractInvokable> jobVertexClass;
-
- private InputFormat<?, ?> inputFormat;
-
- public StreamNode(StreamExecutionEnvironment env, Integer id, StreamOperator<?> operator,
- String operatorName, List<OutputSelector<?>> outputSelector,
- Class<? extends AbstractInvokable> jobVertexClass) {
- this.env = env;
- this.id = id;
- this.operatorName = operatorName;
- this.operator = operator;
- this.outputSelectors = outputSelector;
- this.jobVertexClass = jobVertexClass;
- this.slotSharingID = currentSlotSharingIndex;
- }
-
- public void addInEdge(StreamEdge inEdge) {
- if (inEdge.getTargetId() != getId()) {
- throw new IllegalArgumentException("Destination id doesn't match the StreamNode id");
- } else {
- inEdges.add(inEdge);
- }
- }
-
- public void addOutEdge(StreamEdge outEdge) {
- if (outEdge.getSourceId() != getId()) {
- throw new IllegalArgumentException("Source id doesn't match the StreamNode id");
- } else {
- outEdges.add(outEdge);
- }
- }
-
- public List<StreamEdge> getOutEdges() {
- return outEdges;
- }
-
- public List<StreamEdge> getInEdges() {
- return inEdges;
- }
-
- public List<Integer> getOutEdgeIndices() {
- List<Integer> outEdgeIndices = new ArrayList<Integer>();
-
- for (StreamEdge edge : outEdges) {
- outEdgeIndices.add(edge.getTargetId());
- }
-
- return outEdgeIndices;
- }
-
- public List<Integer> getInEdgeIndices() {
- List<Integer> inEdgeIndices = new ArrayList<Integer>();
-
- for (StreamEdge edge : inEdges) {
- inEdgeIndices.add(edge.getSourceId());
- }
-
- return inEdgeIndices;
- }
-
- public Integer getId() {
- return id;
- }
-
- public int getParallelism() {
- if (parallelism == -1) {
- return env.getParallelism();
- } else {
- return parallelism;
- }
- }
-
- public void setParallelism(Integer parallelism) {
- this.parallelism = parallelism;
- }
-
- public Long getBufferTimeout() {
- return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
- }
-
- public void setBufferTimeout(Long bufferTimeout) {
- this.bufferTimeout = bufferTimeout;
- }
-
- public StreamOperator<?> getOperator() {
- return operator;
- }
-
- public void setOperator(StreamOperator<?> operator) {
- this.operator = operator;
- }
-
- public String getOperatorName() {
- return operatorName;
- }
-
- public void setOperatorName(String operatorName) {
- this.operatorName = operatorName;
- }
-
- public List<OutputSelector<?>> getOutputSelectors() {
- return outputSelectors;
- }
-
- public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
- return OutputSelectorWrapperFactory.create(getOutputSelectors());
- }
-
- public void addOutputSelector(OutputSelector<?> outputSelector) {
- this.outputSelectors.add(outputSelector);
- }
-
- public TypeSerializer<?> getTypeSerializerIn1() {
- return typeSerializerIn1;
- }
-
- public void setSerializerIn1(TypeSerializer<?> typeSerializerIn1) {
- this.typeSerializerIn1 = typeSerializerIn1;
- }
-
- public TypeSerializer<?> getTypeSerializerIn2() {
- return typeSerializerIn2;
- }
-
- public void setSerializerIn2(TypeSerializer<?> typeSerializerIn2) {
- this.typeSerializerIn2 = typeSerializerIn2;
- }
-
- public TypeSerializer<?> getTypeSerializerOut() {
- return typeSerializerOut;
- }
-
- public void setSerializerOut(TypeSerializer<?> typeSerializerOut) {
- this.typeSerializerOut = typeSerializerOut;
- }
-
- public Class<? extends AbstractInvokable> getJobVertexClass() {
- return jobVertexClass;
- }
-
- public InputFormat<?, ?> getInputFormat() {
- return inputFormat;
- }
-
- public void setInputFormat(InputFormat<?, ?> inputFormat) {
- this.inputFormat = inputFormat;
- }
-
- public int getSlotSharingID() {
- return isolatedSlot ? -1 : slotSharingID;
- }
-
- public void startNewSlotSharingGroup() {
- this.slotSharingID = ++currentSlotSharingIndex;
- }
-
- public void isolateSlot() {
- isolatedSlot = true;
- }
-
- @Override
- public String toString() {
- return operatorName + "-" + id;
- }
-
- public KeySelector<?, ?> getStatePartitioner() {
- return statePartitioner;
- }
-
- public void setStatePartitioner(KeySelector<?, ?> statePartitioner) {
- this.statePartitioner = statePartitioner;
- }
-
- public TypeSerializer<?> getStateKeySerializer() {
- return stateKeySerializer;
- }
-
- public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
- this.stateKeySerializer = stateKeySerializer;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- StreamNode that = (StreamNode) o;
-
- return id.equals(that.id);
- }
-
- @Override
- public int hashCode() {
- return id.hashCode();
- }
-}