You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:55 UTC
[14/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
new file mode 100644
index 0000000..9fa4d94
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileSourceFunction.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+
+public class FileSourceFunction extends RichParallelSourceFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ private InputSplitProvider provider;
+
+ private InputFormat<String, ?> inputFormat;
+
+ private TypeInformation<String> typeInfo;
+
+ private volatile boolean isRunning;
+
+ public FileSourceFunction(InputFormat<String, ?> format, TypeInformation<String> typeInfo) {
+ this.inputFormat = format;
+ this.typeInfo = typeInfo;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
+ this.provider = context.getInputSplitProvider();
+ inputFormat.configure(context.getTaskStubParameters());
+ }
+
+ @Override
+ public void run(Collector<String> collector) throws Exception {
+ isRunning = true;
+ final TypeSerializer<String> serializer = typeInfo.createSerializer(getRuntimeContext()
+ .getExecutionConfig());
+ final Iterator<InputSplit> splitIterator = getInputSplits();
+ @SuppressWarnings("unchecked")
+ final InputFormat<String, InputSplit> format = (InputFormat<String, InputSplit>) this.inputFormat;
+ try {
+ while (isRunning && splitIterator.hasNext()) {
+
+ final InputSplit split = splitIterator.next();
+ String record = serializer.createInstance();
+
+ format.open(split);
+ while (isRunning && !format.reachedEnd()) {
+ if ((record = format.nextRecord(record)) != null) {
+ collector.collect(record);
+ }
+ }
+
+ }
+ collector.close();
+ } finally {
+ format.close();
+ }
+ isRunning = false;
+ }
+
+ private Iterator<InputSplit> getInputSplits() {
+
+ return new Iterator<InputSplit>() {
+
+ private InputSplit nextSplit;
+
+ private boolean exhausted;
+
+ @Override
+ public boolean hasNext() {
+ if (exhausted) {
+ return false;
+ }
+
+ if (nextSplit != null) {
+ return true;
+ }
+
+ InputSplit split = provider.getNextInputSplit();
+
+ if (split != null) {
+ this.nextSplit = split;
+ return true;
+ } else {
+ exhausted = true;
+ return false;
+ }
+ }
+
+ @Override
+ public InputSplit next() {
+ if (this.nextSplit == null && !hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ final InputSplit tmp = this.nextSplit;
+ this.nextSplit = null;
+ return tmp;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
new file mode 100644
index 0000000..5bc1eb0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class FromElementsFunction<T> implements SourceFunction<T> {
+ private static final long serialVersionUID = 1L;
+
+ Iterable<T> iterable;
+
+ private volatile boolean isRunning;
+
+ public FromElementsFunction(T... elements) {
+ this.iterable = Arrays.asList(elements);
+ }
+
+ public FromElementsFunction(Collection<T> elements) {
+ this.iterable = elements;
+ }
+
+ public FromElementsFunction(Iterable<T> elements) {
+ this.iterable = elements;
+ }
+
+ @Override
+ public void run(Collector<T> collector) throws Exception {
+ isRunning = true;
+ for (T element : iterable) {
+ if (isRunning) {
+ collector.collect(element);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
new file mode 100644
index 0000000..4878c1b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenSequenceFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.NumberSequenceIterator;
+
+/**
+ * Source Function used to generate the number sequence
+ *
+ */
+public class GenSequenceFunction extends RichParallelSourceFunction<Long> {
+
+ private static final long serialVersionUID = 1L;
+
+ private NumberSequenceIterator fullIterator;
+ private NumberSequenceIterator splitIterator;
+
+ private volatile boolean isRunning;
+
+ public GenSequenceFunction(long from, long to) {
+ fullIterator = new NumberSequenceIterator(from, to);
+ }
+
+ @Override
+ public void run(Collector<Long> collector) throws Exception {
+ isRunning = true;
+ while (splitIterator.hasNext() && isRunning) {
+ collector.collect(splitIterator.next());
+ }
+ }
+
+ @Override
+ public void open(Configuration config) {
+ int splitNumber = getRuntimeContext().getIndexOfThisSubtask();
+ int numOfSubTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+ splitIterator = fullIterator.split(numOfSubTasks)[splitNumber];
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
new file mode 100644
index 0000000..72a18d0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+public interface GenericSourceFunction<T> {
+
+ public TypeInformation<T> getType();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
new file mode 100644
index 0000000..c39a372
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/ParallelSourceFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+/**
+ * A stream data source that is executed in parallel. Upon execution, the runtime will
+ * execute as many parallel instances of this function function as configured parallelism
+ * of the source.
+ *
+ * <p>This interface acts only as a marker to tell the system that this source may
+ * be executed in parallel. When different parallel instances are required to perform
+ * different tasks, use the {@link RichParallelSourceFunction} to get access to the runtime
+ * context, which revels information like the number of parallel tasks, and which parallel
+ * task the current instance is.
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public interface ParallelSourceFunction<OUT> extends SourceFunction<OUT> {
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
new file mode 100644
index 0000000..fcbcbce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichParallelSourceFunction.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Base class for implementing a data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichParallelSourceFunction<OUT> extends AbstractRichFunction
+ implements ParallelSourceFunction<OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
new file mode 100644
index 0000000..dd08b2a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Base class for implementing a parallel data source that has access to context information
+ * (via {@link #getRuntimeContext()}) and additional life-cycle methods
+ * ({@link #open(org.apache.flink.configuration.Configuration)} and {@link #close()}.
+ *
+ * <p>This class is useful when implementing parallel sources where different parallel subtasks
+ * need to perform different work. Typical patterns for that are:
+ * <ul>
+ * <li>Use {@link #getRuntimeContext()} to obtain the runtime context.</li>
+ * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getNumberOfParallelSubtasks()}
+ * to determine the current parallelism. It is strongly encouraged to use this method, rather than
+ * hard-wiring the parallelism, because the configured parallelism may change depending on
+ * program configuration. The parallelism may also change after recovering failures, when fewer than
+ * desired parallel worker as available.</li>
+ * <li>Use {@link org.apache.flink.api.common.functions.RuntimeContext#getIndexOfThisSubtask()} to
+ * determine which subtask the current instance of the function executes.</li>
+ * </ul>
+ * </p>
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
new file mode 100644
index 0000000..b36ae39
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SocketTextStreamFunction.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SocketTextStreamFunction extends RichSourceFunction<String> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(SocketTextStreamFunction.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private String hostname;
+ private int port;
+ private char delimiter;
+ private long maxRetry;
+ private boolean retryForever;
+ private Socket socket;
+ private static final int CONNECTION_TIMEOUT_TIME = 0;
+ private static final int CONNECTION_RETRY_SLEEP = 1000;
+
+ private volatile boolean isRunning = false;
+
+ public SocketTextStreamFunction(String hostname, int port, char delimiter, long maxRetry) {
+ this.hostname = hostname;
+ this.port = port;
+ this.delimiter = delimiter;
+ this.maxRetry = maxRetry;
+ this.retryForever = maxRetry < 0;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ socket = new Socket();
+ socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
+ }
+
+ @Override
+ public void run(Collector<String> collector) throws Exception {
+ streamFromSocket(collector, socket);
+ }
+
+ public void streamFromSocket(Collector<String> collector, Socket socket) throws Exception {
+ isRunning = true;
+ try {
+ StringBuffer buffer = new StringBuffer();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ socket.getInputStream()));
+
+ while (isRunning) {
+ int data;
+ try {
+ data = reader.read();
+ } catch (SocketException e) {
+ if (!isRunning) {
+ break;
+ } else {
+ throw e;
+ }
+ }
+
+ if (data == -1) {
+ socket.close();
+ long retry = 0;
+ boolean success = false;
+ while (retry < maxRetry && !success) {
+ if (!retryForever) {
+ retry++;
+ }
+ LOG.warn("Lost connection to server socket. Retrying in "
+ + (CONNECTION_RETRY_SLEEP / 1000) + " seconds...");
+ try {
+ socket = new Socket();
+ socket.connect(new InetSocketAddress(hostname, port),
+ CONNECTION_TIMEOUT_TIME);
+ success = true;
+ } catch (ConnectException ce) {
+ Thread.sleep(CONNECTION_RETRY_SLEEP);
+ }
+ }
+
+ if (success) {
+ LOG.info("Server socket is reconnected.");
+ } else {
+ LOG.error("Could not reconnect to server socket.");
+ break;
+ }
+ reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
+ continue;
+ }
+
+ if (data == delimiter) {
+ collector.collect(buffer.toString());
+ buffer = new StringBuffer();
+ } else if (data != '\r') { // ignore carriage return
+ buffer.append((char) data);
+ }
+ }
+
+ if (buffer.length() > 0) {
+ collector.collect(buffer.toString());
+ }
+ } finally {
+ socket.close();
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ if (socket != null && !socket.isClosed()) {
+ try {
+ socket.close();
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Could not close open socket");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
new file mode 100644
index 0000000..9f0602f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.source;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface for a stream data source.
+ *
+ * <p>Sources implementing this specific interface are executed with
+ * parallelism 1. To execute your sources in parallel
+ * see {@link ParallelSourceFunction}.</p>
+ *
+ * @param <OUT> The type of the records produced by this source.
+ */
+public interface SourceFunction<OUT> extends Function, Serializable {
+
+ /**
+ * Main work method of the source. This function is invoked at the beginning of the
+ * source's life and is expected to produce its data py "pushing" the records into
+ * the given collector.
+ *
+ * @param collector The collector that forwards records to the source's consumers.
+ *
+ * @throws Exception Throwing any type of exception will cause the source to be considered
+ * failed. When fault tolerance is enabled, recovery will be triggered,
+ * which may create a new instance of this source.
+ */
+ public void run(Collector<OUT> collector) throws Exception;
+
+ /**
+ * This method signals the source function to cancel its operation
+ * The method is called by the framework if the task is to be aborted prematurely.
+ * This happens when the user cancels the job, or when the task is canceled as
+ * part of a program failure and cleanup.
+ */
+ public void cancel();
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
new file mode 100644
index 0000000..8d8ded9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+public class JSONGenerator {
+
+ public static final String STEPS = "step_function";
+ public static final String ID = "id";
+ public static final String SIDE = "side";
+ public static final String SHIP_STRATEGY = "ship_strategy";
+ public static final String PREDECESSORS = "predecessors";
+ public static final String TYPE = "type";
+ public static final String PACT = "pact";
+ public static final String CONTENTS = "contents";
+ public static final String PARALLELISM = "parallelism";
+
+ private StreamGraph streamGraph;
+
+ public JSONGenerator(StreamGraph streamGraph) {
+ this.streamGraph = streamGraph;
+ }
+
+ public String getJSON() throws JSONException {
+ JSONObject json = new JSONObject();
+ JSONArray nodes = new JSONArray();
+ json.put("nodes", nodes);
+ List<Integer> operatorIDs = new ArrayList<Integer>(streamGraph.getVertexIDs());
+ Collections.sort(operatorIDs);
+ visit(nodes, operatorIDs, new HashMap<Integer, Integer>());
+ return json.toString();
+ }
+
+ private void visit(JSONArray jsonArray, List<Integer> toVisit,
+ Map<Integer, Integer> edgeRemapings) throws JSONException {
+
+ Integer vertexID = toVisit.get(0);
+ StreamNode vertex = streamGraph.getVertex(vertexID);
+
+ if (streamGraph.getSourceIDs().contains(vertexID)
+ || Collections.disjoint(vertex.getInEdges(), toVisit)) {
+
+ JSONObject node = new JSONObject();
+ decorateNode(vertexID, node);
+
+ if (!streamGraph.getSourceIDs().contains(vertexID)) {
+ JSONArray inputs = new JSONArray();
+ node.put(PREDECESSORS, inputs);
+
+ for (StreamEdge inEdge : vertex.getInEdges()) {
+ int inputID = inEdge.getSourceID();
+
+ Integer mappedID = (edgeRemapings.keySet().contains(inputID)) ? edgeRemapings
+ .get(inputID) : inputID;
+ decorateEdge(inputs, vertexID, mappedID, inputID);
+ }
+ }
+ jsonArray.put(node);
+ toVisit.remove(vertexID);
+ } else {
+ Integer iterationHead = -1;
+ for (StreamEdge inEdge : vertex.getInEdges()) {
+ int operator = inEdge.getSourceID();
+
+ if (streamGraph.vertexIDtoLoop.containsKey(operator)) {
+ iterationHead = operator;
+ }
+ }
+
+ JSONObject obj = new JSONObject();
+ JSONArray iterationSteps = new JSONArray();
+ obj.put(STEPS, iterationSteps);
+ obj.put(ID, iterationHead);
+ obj.put(PACT, "IterativeDataStream");
+ obj.put(PARALLELISM, streamGraph.getVertex(iterationHead).getParallelism());
+ obj.put(CONTENTS, "Stream Iteration");
+ JSONArray iterationInputs = new JSONArray();
+ obj.put(PREDECESSORS, iterationInputs);
+ toVisit.remove(iterationHead);
+ visitIteration(iterationSteps, toVisit, iterationHead, edgeRemapings, iterationInputs);
+ jsonArray.put(obj);
+ }
+
+ if (!toVisit.isEmpty()) {
+ visit(jsonArray, toVisit, edgeRemapings);
+ }
+ }
+
+ private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int headId,
+ Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {
+
+ Integer vertexID = toVisit.get(0);
+ StreamNode vertex = streamGraph.getVertex(vertexID);
+ toVisit.remove(vertexID);
+
+ // Ignoring head and tail to avoid redundancy
+ if (!streamGraph.vertexIDtoLoop.containsKey(vertexID)) {
+ JSONObject obj = new JSONObject();
+ jsonArray.put(obj);
+ decorateNode(vertexID, obj);
+ JSONArray inEdges = new JSONArray();
+ obj.put(PREDECESSORS, inEdges);
+
+ for (StreamEdge inEdge : vertex.getInEdges()) {
+ int inputID = inEdge.getSourceID();
+
+ if (edgeRemapings.keySet().contains(inputID)) {
+ decorateEdge(inEdges, vertexID, inputID, inputID);
+ } else if (!streamGraph.vertexIDtoLoop.containsKey(inputID)) {
+ decorateEdge(iterationInEdges, vertexID, inputID, inputID);
+ }
+ }
+
+ edgeRemapings.put(vertexID, headId);
+ visitIteration(jsonArray, toVisit, headId, edgeRemapings, iterationInEdges);
+ }
+
+ }
+
+ private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID, int inputID)
+ throws JSONException {
+ JSONObject input = new JSONObject();
+ inputArray.put(input);
+ input.put(ID, mappedInputID);
+ input.put(SHIP_STRATEGY, streamGraph.getEdge(inputID, vertexID).getPartitioner()
+ .getStrategy());
+ input.put(SIDE, (inputArray.length() == 0) ? "first" : "second");
+ }
+
+ private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
+
+ StreamNode vertex = streamGraph.getVertex(vertexID);
+
+ node.put(ID, vertexID);
+ node.put(TYPE, vertex.getOperatorName());
+
+ if (streamGraph.getSourceIDs().contains(vertexID)) {
+ node.put(PACT, "Data Source");
+ } else {
+ node.put(PACT, "Data Stream");
+ }
+
+ StreamOperator<?, ?> operator = streamGraph.getVertex(vertexID).getOperator();
+
+ if (operator != null && operator.getUserFunction() != null) {
+ node.put(CONTENTS, vertex.getOperatorName() + " at "
+ + operator.getUserFunction().getClass().getSimpleName());
+ } else {
+ node.put(CONTENTS, vertex.getOperatorName());
+ }
+
+ node.put(PARALLELISM, streamGraph.getVertex(vertexID).getParallelism());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
new file mode 100644
index 0000000..f5e771a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
+import org.apache.flink.util.InstantiationUtil;
+
+public class StreamConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
+ private static final String NUMBER_OF_INPUTS = "numberOfInputs";
+ private static final String CHAINED_OUTPUTS = "chainedOutputs";
+ private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
+ private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
+ private static final String OUTPUT_NAME = "outputName_";
+ private static final String VERTEX_NAME = "vertexID";
+ private static final String OPERATOR_NAME = "operatorName";
+ private static final String ITERATION_ID = "iterationId";
+ private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
+ private static final String SERIALIZEDUDF = "serializedUDF";
+ private static final String USER_FUNCTION = "userFunction";
+ private static final String BUFFER_TIMEOUT = "bufferTimeout";
+ private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
+ private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
+ private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
+ private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
+ private static final String ITERATON_WAIT = "iterationWait";
+ private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
+ private static final String EDGES_IN_ORDER = "edgesInOrder";
+ private static final String OUT_STREAM_EDGES = "outStreamEdges";
+ private static final String IN_STREAM_EDGES = "inStreamEdges";
+
+ // DEFAULT VALUES
+ private static final long DEFAULT_TIMEOUT = 100;
+ public static final String STATE_MONITORING = "STATE_MONITORING";
+
+ // CONFIG METHODS
+
+ private Configuration config;
+
+ public StreamConfig(Configuration config) {
+ this.config = config;
+ }
+
+ public Configuration getConfiguration() {
+ return config;
+ }
+
+ public void setVertexID(Integer vertexID) {
+ config.setInteger(VERTEX_NAME, vertexID);
+ }
+
+ public Integer getVertexID() {
+ return config.getInteger(VERTEX_NAME, -1);
+ }
+
+ public void setOperatorName(String name) {
+ config.setString(OPERATOR_NAME, name);
+ }
+
+ public String getOperatorName() {
+ return config.getString(OPERATOR_NAME, "Missing");
+ }
+
+ public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
+ }
+
+ public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
+ }
+
+ public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
+ }
+
+ public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) {
+ setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader cl) {
+ try {
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_IN_1, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate serializer.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader cl) {
+ try {
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_IN_2, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate serializer.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader cl) {
+ try {
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_OUT_1, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate serializer.");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader cl) {
+ try {
+ return (StreamRecordSerializer<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ TYPE_SERIALIZER_OUT_2, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate serializer.");
+ }
+ }
+
+ private void setTypeSerializer(String key, StreamRecordSerializer<?> typeWrapper) {
+ config.setBytes(key, SerializationUtils.serialize(typeWrapper));
+ }
+
+ public void setBufferTimeout(long timeout) {
+ config.setLong(BUFFER_TIMEOUT, timeout);
+ }
+
+ public long getBufferTimeout() {
+ return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
+ }
+
+ public void setStreamOperator(StreamOperator<?, ?> operator) {
+ if (operator != null) {
+ config.setClass(USER_FUNCTION, operator.getClass());
+
+ try {
+ config.setBytes(SERIALIZEDUDF, SerializationUtils.serialize(operator));
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize operator object "
+ + operator.getClass(), e);
+ }
+ }
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ public <T> T getStreamOperator(ClassLoader cl) {
+ try {
+ return (T) InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
+ } catch (Exception e) {
+ throw new StreamTaskException("Cannot instantiate user function", e);
+ }
+ }
+
+ public void setOutputSelectorWrapper(OutputSelectorWrapper<?> outputSelectorWrapper) {
+ try {
+ config.setBytes(OUTPUT_SELECTOR_WRAPPER, SerializationUtils.serialize(outputSelectorWrapper));
+ } catch (SerializationException e) {
+ throw new RuntimeException("Cannot serialize OutputSelectorWrapper");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> OutputSelectorWrapper<T> getOutputSelectorWrapper(ClassLoader cl) {
+ try {
+ return (OutputSelectorWrapper<T>) InstantiationUtil.readObjectFromConfig(this.config,
+ OUTPUT_SELECTOR_WRAPPER, cl);
+ } catch (Exception e) {
+ throw new StreamTaskException("Cannot deserialize and instantiate OutputSelectorWrapper", e);
+ }
+ }
+
+ public void setIterationId(Integer iterationId) {
+ config.setInteger(ITERATION_ID, iterationId);
+ }
+
+ public Integer getIterationId() {
+ return config.getInteger(ITERATION_ID, 0);
+ }
+
+ public void setIterationWaitTime(long time) {
+ config.setLong(ITERATON_WAIT, time);
+ }
+
+ public long getIterationWaitTime() {
+ return config.getLong(ITERATON_WAIT, 0);
+ }
+
+ public void setSelectedNames(Integer output, List<String> selected) {
+ if (selected != null) {
+ config.setBytes(OUTPUT_NAME + output,
+ SerializationUtils.serialize((Serializable) selected));
+ } else {
+ config.setBytes(OUTPUT_NAME + output,
+ SerializationUtils.serialize(new ArrayList<String>()));
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<String> getSelectedNames(Integer output) {
+ return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
+ null));
+ }
+
+ public void setNumberOfInputs(int numberOfInputs) {
+ config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
+ }
+
+ public int getNumberOfInputs() {
+ return config.getInteger(NUMBER_OF_INPUTS, 0);
+ }
+
+ public void setNumberOfOutputs(int numberOfOutputs) {
+ config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
+ }
+
+ public int getNumberOfOutputs() {
+ return config.getInteger(NUMBER_OF_OUTPUTS, 0);
+ }
+
+ public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
+ config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate outputs.");
+ }
+ }
+
+ public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
+ config.setBytes(CHAINED_OUTPUTS,
+ SerializationUtils.serialize((Serializable) chainedOutputs));
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
+ CHAINED_OUTPUTS, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate chained outputs.");
+ }
+ }
+
+ public void setOutEdges(List<StreamEdge> outEdges) {
+ config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<StreamEdge> getOutEdges(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+ this.config, OUT_STREAM_EDGES, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate outputs.");
+ }
+ }
+
+ public void setInPhysicalEdges(List<StreamEdge> inEdges) {
+ config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<StreamEdge> getInPhysicalEdges(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+ this.config, IN_STREAM_EDGES, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate inputs.");
+ }
+ }
+
+ public void setStateMonitoring(boolean stateMonitoring) {
+
+ config.setBoolean(STATE_MONITORING, stateMonitoring);
+
+ }
+
+ public boolean getStateMonitoring()
+ {
+ return config.getBoolean(STATE_MONITORING, false);
+ }
+
+ public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
+ config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
+ }
+
+ @SuppressWarnings("unchecked")
+ public List<StreamEdge> getOutEdgesInOrder(ClassLoader cl) {
+ try {
+ return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
+ this.config, EDGES_IN_ORDER, cl);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate outputs.");
+ }
+ }
+
+ public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
+ config.setBytes(CHAINED_TASK_CONFIG,
+ SerializationUtils.serialize((Serializable) chainedTaskConfigs));
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) {
+ try {
+
+ Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil
+ .readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl);
+
+ return confs == null ? new HashMap<Integer, StreamConfig>() : confs;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate configuration.");
+ }
+ }
+
+ public void setChainStart() {
+ config.setBoolean(IS_CHAINED_VERTEX, true);
+ }
+
+ public boolean isChainStart() {
+ return config.getBoolean(IS_CHAINED_VERTEX, false);
+ }
+
+ @Override
+ public String toString() {
+
+ ClassLoader cl = getClass().getClassLoader();
+
+ StringBuilder builder = new StringBuilder();
+ builder.append("\n=======================");
+ builder.append("Stream Config");
+ builder.append("=======================");
+ builder.append("\nTask name: " + getVertexID());
+ builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
+ builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
+ builder.append("\nOutput names: " + getNonChainedOutputs(cl));
+ builder.append("\nPartitioning:");
+ for (StreamEdge output : getNonChainedOutputs(cl)) {
+ int outputname = output.getTargetID();
+ builder.append("\n\t" + outputname + ": " + output.getPartitioner());
+ }
+
+ builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+
+ try {
+ builder.append("\nOperator: " + getStreamOperator(cl).getClass().getSimpleName());
+ } catch (Exception e) {
+ builder.append("\nOperator: Missing");
+ }
+ builder.append("\nBuffer timeout: " + getBufferTimeout());
+ builder.append("\nState Monitoring: " + getStateMonitoring());
+ if (isChainStart() && getChainedOutputs(cl).size() > 0) {
+ builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
+ builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+ }
+
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
new file mode 100644
index 0000000..d34b21a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+
+/**
+ * An edge in the streaming topology. One edge like this does not necessarily
+ * gets converted to a connection between two job vertices (due to
+ * chaining/optimization).
+ */
+public class StreamEdge implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ final private String edgeId;
+
+ final private StreamNode sourceVertex;
+ final private StreamNode targetVertex;
+
+ /**
+ * The type number of the input for co-tasks.
+ */
+ final private int typeNumber;
+
+ /**
+ * A list of output names that the target vertex listens to (if there is
+ * output selection).
+ */
+ final private List<String> selectedNames;
+ final private StreamPartitioner<?> outputPartitioner;
+
+ public StreamEdge(StreamNode sourceVertex, StreamNode targetVertex, int typeNumber,
+ List<String> selectedNames, StreamPartitioner<?> outputPartitioner) {
+ this.sourceVertex = sourceVertex;
+ this.targetVertex = targetVertex;
+ this.typeNumber = typeNumber;
+ this.selectedNames = selectedNames;
+ this.outputPartitioner = outputPartitioner;
+
+ this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + selectedNames
+ + "_" + outputPartitioner;
+ }
+
+ public StreamNode getSourceVertex() {
+ return sourceVertex;
+ }
+
+ public StreamNode getTargetVertex() {
+ return targetVertex;
+ }
+
+ public int getSourceID() {
+ return sourceVertex.getID();
+ }
+
+ public int getTargetID() {
+ return targetVertex.getID();
+ }
+
+ public int getTypeNumber() {
+ return typeNumber;
+ }
+
+ public List<String> getSelectedNames() {
+ return selectedNames;
+ }
+
+ public StreamPartitioner<?> getPartitioner() {
+ return outputPartitioner;
+ }
+
+ @Override
+ public int hashCode() {
+ return edgeId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ StreamEdge that = (StreamEdge) o;
+
+ if (!edgeId.equals(that.edgeId)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "(" + sourceVertex + " -> " + targetVertex + ", typeNumber=" + typeNumber
+ + ", selectedNames=" + selectedNames + ", outputPartitioner=" + outputPartitioner
+ + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
new file mode 100644
index 0000000..bfeed28
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.MissingTypeInfo;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoStreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.CoStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class representing the streaming topology. It contains all the information
+ * necessary to build the jobgraph for the execution.
+ *
+ */
+public class StreamGraph extends StreamingPlan {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
+ private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
+ private String jobName = DEAFULT_JOB_NAME;
+
+ private final StreamExecutionEnvironment environemnt;
+ private final ExecutionConfig executionConfig;
+
+ private boolean checkpointingEnabled = false;
+ private long checkpointingInterval = 5000;
+ private boolean chaining = true;
+
+ private final Map<Integer, StreamNode> streamNodes;
+ private final Set<Integer> sources;
+
+ private final Map<Integer, StreamLoop> streamLoops;
+ protected final Map<Integer, StreamLoop> vertexIDtoLoop;
+
+ public StreamGraph(StreamExecutionEnvironment environment) {
+
+ this.environemnt = environment;
+ executionConfig = environment.getConfig();
+
+ streamNodes = new HashMap<Integer, StreamNode>();
+ streamLoops = new HashMap<Integer, StreamLoop>();
+ vertexIDtoLoop = new HashMap<Integer, StreamGraph.StreamLoop>();
+ sources = new HashSet<Integer>();
+ }
+
+ protected ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public void setChaining(boolean chaining) {
+ this.chaining = chaining;
+ }
+
+ public void setCheckpointingEnabled(boolean checkpointingEnabled) {
+ this.checkpointingEnabled = checkpointingEnabled;
+ }
+
+ public void setCheckpointingInterval(long checkpointingInterval) {
+ this.checkpointingInterval = checkpointingInterval;
+ }
+
+ public long getCheckpointingInterval() {
+ return checkpointingInterval;
+ }
+
+ public boolean isChainingEnabled() {
+ return chaining;
+ }
+
+ public boolean isCheckpointingEnabled() {
+ return checkpointingEnabled;
+ }
+
+ public boolean isIterative() {
+ return !streamLoops.isEmpty();
+ }
+
+ public <IN, OUT> void addSource(Integer vertexID, StreamOperator<IN, OUT> operatorObject,
+ TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+ addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
+ sources.add(vertexID);
+ }
+
+ public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<IN, OUT> operatorObject,
+ TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+
+ addNode(vertexID, StreamTask.class, operatorObject, operatorName);
+
+ StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>(
+ inTypeInfo, executionConfig) : null;
+
+ StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+ && !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
+ outTypeInfo, executionConfig) : null;
+
+ setSerializers(vertexID, inSerializer, null, outSerializer);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex: {}", vertexID);
+ }
+ }
+
+ public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
+ CoStreamOperator<IN1, IN2, OUT> taskoperatorObject, TypeInformation<IN1> in1TypeInfo,
+ TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
+
+ addNode(vertexID, CoStreamTask.class, taskoperatorObject, operatorName);
+
+ StreamRecordSerializer<OUT> outSerializer = (outTypeInfo != null)
+ && !(outTypeInfo instanceof MissingTypeInfo) ? new StreamRecordSerializer<OUT>(
+ outTypeInfo, executionConfig) : null;
+
+ setSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
+ new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), outSerializer);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CO-TASK: {}", vertexID);
+ }
+ }
+
+ public void addIterationHead(Integer vertexID, Integer iterationHead, Integer iterationID,
+ long timeOut) {
+
+ addNode(vertexID, StreamIterationHead.class, null, null);
+
+ chaining = false;
+
+ StreamLoop iteration = new StreamLoop(iterationID, getVertex(iterationHead), timeOut);
+ streamLoops.put(iterationID, iteration);
+ vertexIDtoLoop.put(vertexID, iteration);
+
+ setSerializersFrom(iterationHead, vertexID);
+ getVertex(vertexID).setOperatorName("IterationHead-" + iterationHead);
+
+ int outpartitionerIndex = getVertex(iterationHead).getInEdgeIndices().get(0);
+ StreamPartitioner<?> outputPartitioner = getVertex(outpartitionerIndex).getOutEdges()
+ .get(0).getPartitioner();
+
+ addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ITERATION SOURCE: {}", vertexID);
+ }
+
+ sources.add(vertexID);
+ }
+
+ public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
+ long waitTime) {
+
+ if (getVertex(iterationTail).getBufferTimeout() == 0) {
+ throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
+ }
+
+ addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
+ getVertex(iterationTail).getParallelism());
+
+ StreamLoop iteration = streamLoops.get(iterationID);
+ iteration.setTail(getVertex(iterationTail));
+ vertexIDtoLoop.put(vertexID, iteration);
+
+ setSerializersFrom(iterationTail, vertexID);
+ getVertex(vertexID).setOperatorName("IterationTail-" + iterationTail);
+
+ setParallelism(iteration.getHead().getID(), getVertex(iterationTail).getParallelism());
+ setBufferTimeout(iteration.getHead().getID(), getVertex(iterationTail).getBufferTimeout());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ITERATION SINK: {}", vertexID);
+ }
+
+ }
+
+ protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
+ StreamOperator<?, ?> operatorObject, String operatorName) {
+
+ StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
+ new ArrayList<OutputSelector<?>>(), vertexClass);
+
+ streamNodes.put(vertexID, vertex);
+
+ return vertex;
+ }
+
+ public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
+ StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {
+
+ StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID),
+ getVertex(downStreamVertexID), typeNumber, outputNames, partitionerObject);
+ getVertex(edge.getSourceID()).addOutEdge(edge);
+ getVertex(edge.getTargetID()).addInEdge(edge);
+ }
+
+ public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
+ getVertex(vertexID).addOutputSelector(outputSelector);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Outputselector set for {}", vertexID);
+ }
+
+ }
+
+ public void setParallelism(Integer vertexID, int parallelism) {
+ getVertex(vertexID).setParallelism(parallelism);
+ }
+
+ public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
+ getVertex(vertexID).setBufferTimeout(bufferTimeout);
+ }
+
+ private void setSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
+ StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out) {
+ StreamNode vertex = getVertex(vertexID);
+ vertex.setSerializerIn1(in1);
+ vertex.setSerializerIn2(in2);
+ vertex.setSerializerOut(out);
+ }
+
+ private void setSerializersFrom(Integer from, Integer to) {
+ StreamNode fromVertex = getVertex(from);
+ StreamNode toVertex = getVertex(to);
+
+ toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
+ toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
+ }
+
+ public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
+ StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
+ executionConfig);
+ getVertex(vertexID).setSerializerOut(serializer);
+ }
+
+ public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<IN, OUT> operatorObject) {
+ getVertex(vertexID).setOperator(operatorObject);
+ }
+
+ public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) {
+ getVertex(vertexID).setInputFormat(inputFormat);
+ }
+
+ public StreamNode getVertex(Integer vertexID) {
+ return streamNodes.get(vertexID);
+ }
+
+ protected Collection<? extends Integer> getVertexIDs() {
+ return streamNodes.keySet();
+ }
+
+ protected StreamEdge getEdge(int sourceId, int targetId) {
+ Iterator<StreamEdge> outIterator = getVertex(sourceId).getOutEdges().iterator();
+ while (outIterator.hasNext()) {
+ StreamEdge edge = outIterator.next();
+
+ if (edge.getTargetID() == targetId) {
+ return edge;
+ }
+ }
+
+ throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
+ }
+
+ public Collection<Integer> getSourceIDs() {
+ return sources;
+ }
+
+ public Set<Tuple2<Integer, StreamOperator<?, ?>>> getOperators() {
+ Set<Tuple2<Integer, StreamOperator<?, ?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?, ?>>>();
+ for (StreamNode vertex : streamNodes.values()) {
+ operatorSet.add(new Tuple2<Integer, StreamOperator<?, ?>>(vertex.getID(), vertex
+ .getOperator()));
+ }
+ return operatorSet;
+ }
+
+ public Collection<StreamLoop> getStreamLoops() {
+ return streamLoops.values();
+ }
+
+ public Integer getLoopID(Integer vertexID) {
+ return vertexIDtoLoop.get(vertexID).getID();
+ }
+
+ public long getLoopTimeout(Integer vertexID) {
+ return vertexIDtoLoop.get(vertexID).getTimeout();
+ }
+
+ protected void removeEdge(StreamEdge edge) {
+
+ edge.getSourceVertex().getOutEdges().remove(edge);
+ edge.getTargetVertex().getInEdges().remove(edge);
+
+ }
+
+ protected void removeVertex(StreamNode toRemove) {
+
+ Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
+
+ edgesToRemove.addAll(toRemove.getInEdges());
+ edgesToRemove.addAll(toRemove.getOutEdges());
+
+ for (StreamEdge edge : edgesToRemove) {
+ removeEdge(edge);
+ }
+ streamNodes.remove(toRemove.getID());
+ }
+
+ /**
+ * Gets the assembled {@link JobGraph} and adds a default name for it.
+ */
+ public JobGraph getJobGraph() {
+ return getJobGraph(jobName);
+ }
+
+ /**
+ * Gets the assembled {@link JobGraph} and adds a user specified name for
+ * it.
+ *
+ * @param jobGraphName
+ * name of the jobGraph
+ */
+ public JobGraph getJobGraph(String jobGraphName) {
+
+ // temporarily forbid checkpointing for iterative jobs
+ if (isIterative() && isCheckpointingEnabled()) {
+ throw new UnsupportedOperationException(
+ "Checkpointing is currently not supported for iterative jobs!");
+ }
+
+ setJobName(jobGraphName);
+
+ WindowingOptimizer.optimizeGraph(this);
+
+ StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
+
+ return jobgraphGenerator.createJobGraph(jobGraphName);
+ }
+
+ @Override
+ public String getStreamingPlanAsJSON() {
+
+ WindowingOptimizer.optimizeGraph(this);
+
+ try {
+ return new JSONGenerator(this).getJSON();
+ } catch (JSONException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("JSON plan creation failed: {}", e);
+ }
+ return "";
+ }
+
+ }
+
+ @Override
+ public void dumpStreamingPlanAsJSON(File file) throws IOException {
+ PrintWriter pw = null;
+ try {
+ pw = new PrintWriter(new FileOutputStream(file), false);
+ pw.write(getStreamingPlanAsJSON());
+ pw.flush();
+
+ } finally {
+ if (pw != null) {
+ pw.close();
+ }
+ }
+ }
+
+ /**
+ * Object for representing loops in streaming programs.
+ *
+ */
+ protected static class StreamLoop {
+
+ private Integer loopID;
+
+ private StreamNode head;
+ private StreamNode tail;
+
+ private Long timeout;
+
+ public StreamLoop(Integer loopID, StreamNode head, Long timeout) {
+ this.loopID = loopID;
+ this.head = head;
+ this.timeout = timeout;
+ }
+
+ public Integer getID() {
+ return loopID;
+ }
+
+ public Long getTimeout() {
+ return timeout;
+ }
+
+ public void setTail(StreamNode tail) {
+ this.tail = tail;
+ }
+
+ public StreamNode getHead() {
+ return head;
+ }
+
+ public StreamNode getTail() {
+ return tail;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
new file mode 100644
index 0000000..cb07f42
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+
+/**
+ * Class representing the operators in the streaming programs, with all their
+ * properties.
+ *
+ */
+public class StreamNode implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ transient private StreamExecutionEnvironment env;
+
+ private Integer ID;
+ private Integer parallelism = null;
+ private Long bufferTimeout = null;
+ private String operatorName;
+
+ private StreamOperator<?, ?> operator;
+ private List<OutputSelector<?>> outputSelectors;
+ private StreamRecordSerializer<?> typeSerializerIn1;
+ private StreamRecordSerializer<?> typeSerializerIn2;
+ private StreamRecordSerializer<?> typeSerializerOut;
+
+ private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
+ private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
+
+ private Class<? extends AbstractInvokable> jobVertexClass;
+
+ private InputFormat<String, ?> inputFormat;
+
+ public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator<?, ?> operator,
+ String operatorName, List<OutputSelector<?>> outputSelector,
+ Class<? extends AbstractInvokable> jobVertexClass) {
+ this.env = env;
+ this.ID = ID;
+ this.operatorName = operatorName;
+ this.operator = operator;
+ this.outputSelectors = outputSelector;
+ this.jobVertexClass = jobVertexClass;
+ }
+
+ public void addInEdge(StreamEdge inEdge) {
+ if (inEdge.getTargetID() != getID()) {
+ throw new IllegalArgumentException("Destination ID doesn't match the StreamNode ID");
+ } else {
+ inEdges.add(inEdge);
+ }
+ }
+
+ public void addOutEdge(StreamEdge outEdge) {
+ if (outEdge.getSourceID() != getID()) {
+ throw new IllegalArgumentException("Source ID doesn't match the StreamNode ID");
+ } else {
+ outEdges.add(outEdge);
+ }
+ }
+
+ public List<StreamEdge> getOutEdges() {
+ return outEdges;
+ }
+
+ public List<StreamEdge> getInEdges() {
+ return inEdges;
+ }
+
+ public List<Integer> getOutEdgeIndices() {
+ List<Integer> outEdgeIndices = new ArrayList<Integer>();
+
+ for (StreamEdge edge : outEdges) {
+ outEdgeIndices.add(edge.getTargetID());
+ }
+
+ return outEdgeIndices;
+ }
+
+ public List<Integer> getInEdgeIndices() {
+ List<Integer> inEdgeIndices = new ArrayList<Integer>();
+
+ for (StreamEdge edge : inEdges) {
+ inEdgeIndices.add(edge.getSourceID());
+ }
+
+ return inEdgeIndices;
+ }
+
+ public Integer getID() {
+ return ID;
+ }
+
+ public Integer getParallelism() {
+ return parallelism != null ? parallelism : env.getParallelism();
+ }
+
+ public void setParallelism(Integer parallelism) {
+ this.parallelism = parallelism;
+ }
+
+ public Long getBufferTimeout() {
+ return bufferTimeout != null ? bufferTimeout : env.getBufferTimeout();
+ }
+
+ public void setBufferTimeout(Long bufferTimeout) {
+ this.bufferTimeout = bufferTimeout;
+ }
+
+ public StreamOperator<?, ?> getOperator() {
+ return operator;
+ }
+
+ public void setOperator(StreamOperator<?, ?> operator) {
+ this.operator = operator;
+ }
+
+ public String getOperatorName() {
+ return operatorName;
+ }
+
+ public void setOperatorName(String operatorName) {
+ this.operatorName = operatorName;
+ }
+
+ public List<OutputSelector<?>> getOutputSelectors() {
+ return outputSelectors;
+ }
+
+ public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
+ return OutputSelectorWrapperFactory.create(getOutputSelectors());
+ }
+
+ public void addOutputSelector(OutputSelector<?> outputSelector) {
+ this.outputSelectors.add(outputSelector);
+ }
+
+ public StreamRecordSerializer<?> getTypeSerializerIn1() {
+ return typeSerializerIn1;
+ }
+
+ public void setSerializerIn1(StreamRecordSerializer<?> typeSerializerIn1) {
+ this.typeSerializerIn1 = typeSerializerIn1;
+ }
+
+ public StreamRecordSerializer<?> getTypeSerializerIn2() {
+ return typeSerializerIn2;
+ }
+
+ public void setSerializerIn2(StreamRecordSerializer<?> typeSerializerIn2) {
+ this.typeSerializerIn2 = typeSerializerIn2;
+ }
+
+ public StreamRecordSerializer<?> getTypeSerializerOut() {
+ return typeSerializerOut;
+ }
+
+ public void setSerializerOut(StreamRecordSerializer<?> typeSerializerOut) {
+ this.typeSerializerOut = typeSerializerOut;
+ }
+
+ public Class<? extends AbstractInvokable> getJobVertexClass() {
+ return jobVertexClass;
+ }
+
+ public InputFormat<String, ?> getInputFormat() {
+ return inputFormat;
+ }
+
+ public void setInputFormat(InputFormat<String, ?> inputFormat) {
+ this.inputFormat = inputFormat;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
new file mode 100644
index 0000000..7b856a1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.PartitioningStrategy;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
+import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingJobGraphGenerator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
+
+ private StreamGraph streamGraph;
+
+ private Map<Integer, AbstractJobVertex> jobVertices;
+ private JobGraph jobGraph;
+ private Collection<Integer> builtVertices;
+
+ private List<StreamEdge> physicalEdgesInOrder;
+
+ private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
+
+ private Map<Integer, StreamConfig> vertexConfigs;
+ private Map<Integer, String> chainedNames;
+
+ public StreamingJobGraphGenerator(StreamGraph streamGraph) {
+ this.streamGraph = streamGraph;
+ }
+
+ private void init() {
+ this.jobVertices = new HashMap<Integer, AbstractJobVertex>();
+ this.builtVertices = new HashSet<Integer>();
+ this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
+ this.vertexConfigs = new HashMap<Integer, StreamConfig>();
+ this.chainedNames = new HashMap<Integer, String>();
+ this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
+ }
+
+ public JobGraph createJobGraph(String jobName) {
+ jobGraph = new JobGraph(jobName);
+
+ // Turn lazy scheduling off
+ jobGraph.setScheduleMode(ScheduleMode.ALL);
+ jobGraph.setJobType(JobGraph.JobType.STREAMING);
+ jobGraph.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
+ jobGraph.setCheckpointingInterval(streamGraph.getCheckpointingInterval());
+
+ if (jobGraph.isCheckpointingEnabled()) {
+ int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
+ if (executionRetries != -1) {
+ jobGraph.setNumberOfExecutionRetries(executionRetries);
+ } else {
+ jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+ }
+ }
+ init();
+
+ setChaining();
+
+ setPhysicalEdges();
+
+ setSlotSharing();
+
+ return jobGraph;
+ }
+
+ private void setPhysicalEdges() {
+ Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
+
+ for (StreamEdge edge : physicalEdgesInOrder) {
+ int target = edge.getTargetID();
+
+ List<StreamEdge> inEdges = physicalInEdgesInOrder.get(target);
+
+ // create if not set
+ if (inEdges == null) {
+ inEdges = new ArrayList<StreamEdge>();
+ physicalInEdgesInOrder.put(target, inEdges);
+ }
+
+ inEdges.add(edge);
+ }
+
+ for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
+ int vertex = inEdges.getKey();
+ List<StreamEdge> edgeList = inEdges.getValue();
+
+ vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
+ }
+ }
+
+ private void setChaining() {
+ for (Integer sourceName : streamGraph.getSourceIDs()) {
+ createChain(sourceName, sourceName);
+ }
+ }
+
+ private List<StreamEdge> createChain(Integer startNode, Integer current) {
+
+ if (!builtVertices.contains(startNode)) {
+
+ List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
+
+ List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
+ List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
+
+ for (StreamEdge outEdge : streamGraph.getVertex(current).getOutEdges()) {
+ if (isChainable(outEdge)) {
+ chainableOutputs.add(outEdge);
+ } else {
+ nonChainableOutputs.add(outEdge);
+ }
+ }
+
+ for (StreamEdge chainable : chainableOutputs) {
+ transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetID()));
+ }
+
+ for (StreamEdge nonChainable : nonChainableOutputs) {
+ transitiveOutEdges.add(nonChainable);
+ createChain(nonChainable.getTargetID(), nonChainable.getTargetID());
+ }
+
+ chainedNames.put(current, createChainedName(current, chainableOutputs));
+
+ StreamConfig config = current.equals(startNode) ? createProcessingVertex(startNode)
+ : new StreamConfig(new Configuration());
+
+ setVertexConfig(current, config, chainableOutputs, nonChainableOutputs);
+
+ if (current.equals(startNode)) {
+
+ config.setChainStart();
+ config.setOutEdgesInOrder(transitiveOutEdges);
+ config.setOutEdges(streamGraph.getVertex(current).getOutEdges());
+
+ for (StreamEdge edge : transitiveOutEdges) {
+ connect(startNode, edge);
+ }
+
+ config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode));
+
+ } else {
+
+ Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNode);
+
+ if (chainedConfs == null) {
+ chainedConfigs.put(startNode, new HashMap<Integer, StreamConfig>());
+ }
+ chainedConfigs.get(startNode).put(current, config);
+ }
+
+ return transitiveOutEdges;
+
+ } else {
+ return new ArrayList<StreamEdge>();
+ }
+ }
+
+ private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
+ String operatorName = streamGraph.getVertex(vertexID).getOperatorName();
+ if (chainedOutputs.size() > 1) {
+ List<String> outputChainedNames = new ArrayList<String>();
+ for (StreamEdge chainable : chainedOutputs) {
+ outputChainedNames.add(chainedNames.get(chainable.getTargetID()));
+ }
+ String returnOperatorName = operatorName + " -> ("
+ + StringUtils.join(outputChainedNames, ", ") + ")";
+ return returnOperatorName;
+ } else if (chainedOutputs.size() == 1) {
+ String returnOperatorName = operatorName + " -> "
+ + chainedNames.get(chainedOutputs.get(0).getTargetID());
+ return returnOperatorName;
+ } else {
+ return operatorName;
+ }
+
+ }
+
+ private StreamConfig createProcessingVertex(Integer vertexID) {
+
+ AbstractJobVertex jobVertex = new AbstractJobVertex(chainedNames.get(vertexID));
+ StreamNode vertex = streamGraph.getVertex(vertexID);
+
+ jobVertex.setInvokableClass(vertex.getJobVertexClass());
+
+ int parallelism = vertex.getParallelism();
+
+ if (parallelism > 0) {
+ jobVertex.setParallelism(parallelism);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Parallelism set: {} for {}", parallelism, vertexID);
+ }
+
+ if (vertex.getInputFormat() != null) {
+ jobVertex.setInputSplitSource(vertex.getInputFormat());
+ }
+
+ jobVertices.put(vertexID, jobVertex);
+ builtVertices.add(vertexID);
+ jobGraph.addVertex(jobVertex);
+
+ StreamConfig retConfig = new StreamConfig(jobVertex.getConfiguration());
+ retConfig.setOperatorName(chainedNames.get(vertexID));
+ return retConfig;
+ }
+
+ private void setVertexConfig(Integer vertexID, StreamConfig config,
+ List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
+
+ StreamNode vertex = streamGraph.getVertex(vertexID);
+
+ config.setVertexID(vertexID);
+ config.setBufferTimeout(vertex.getBufferTimeout());
+
+ config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
+ config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
+ config.setTypeSerializerOut1(vertex.getTypeSerializerOut());
+
+ config.setStreamOperator(vertex.getOperator());
+ config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
+
+ config.setNumberOfOutputs(nonChainableOutputs.size());
+ config.setNonChainedOutputs(nonChainableOutputs);
+ config.setChainedOutputs(chainableOutputs);
+ config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+
+ Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
+
+ if (vertexClass.equals(StreamIterationHead.class)
+ || vertexClass.equals(StreamIterationTail.class)) {
+ config.setIterationId(streamGraph.getLoopID(vertexID));
+ config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexID));
+ }
+
+ List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
+ allOutputs.addAll(nonChainableOutputs);
+
+ for (StreamEdge output : allOutputs) {
+ config.setSelectedNames(output.getTargetID(),
+ streamGraph.getEdge(vertexID, output.getTargetID()).getSelectedNames());
+ }
+
+ vertexConfigs.put(vertexID, config);
+ }
+
+ private void connect(Integer headOfChain, StreamEdge edge) {
+
+ physicalEdgesInOrder.add(edge);
+
+ Integer downStreamvertexID = edge.getTargetID();
+
+ AbstractJobVertex headVertex = jobVertices.get(headOfChain);
+ AbstractJobVertex downStreamVertex = jobVertices.get(downStreamvertexID);
+
+ StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
+
+ downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
+
+ StreamPartitioner<?> partitioner = edge.getPartitioner();
+ if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
+ downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
+ } else {
+ downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
+ headOfChain, downStreamvertexID);
+ }
+ }
+
+ private boolean isChainable(StreamEdge edge) {
+ StreamNode upStreamVertex = edge.getSourceVertex();
+ StreamNode downStreamVertex = edge.getTargetVertex();
+
+ StreamOperator<?, ?> headOperator = upStreamVertex.getOperator();
+ StreamOperator<?, ?> outOperator = downStreamVertex.getOperator();
+
+ return downStreamVertex.getInEdges().size() == 1
+ && outOperator != null
+ && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
+ && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator
+ .getChainingStrategy() == ChainingStrategy.ALWAYS)
+ && (edge.getPartitioner().getStrategy() == PartitioningStrategy.FORWARD || downStreamVertex
+ .getParallelism() == 1)
+ && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
+ && streamGraph.isChainingEnabled();
+ }
+
+ private void setSlotSharing() {
+ SlotSharingGroup shareGroup = new SlotSharingGroup();
+
+ for (AbstractJobVertex vertex : jobVertices.values()) {
+ vertex.setSlotSharingGroup(shareGroup);
+ }
+
+ for (StreamLoop loop : streamGraph.getStreamLoops()) {
+ CoLocationGroup ccg = new CoLocationGroup();
+ AbstractJobVertex tail = jobVertices.get(loop.getTail().getID());
+ AbstractJobVertex head = jobVertices.get(loop.getHead().getID());
+
+ ccg.addVertex(head);
+ ccg.addVertex(tail);
+ }
+ }
+}