You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:29 UTC
[04/15] flink git commit: [Storm Compatibility] Maven module
restucturing and cleanup - removed storm-parent;
renamed storm-core and storm-examples - updated internal Java package
structure * renamed package "stormcompatibility" to "storm" *
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
new file mode 100644
index 0000000..39e7a25
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.WordCountLocal;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WordCountLocalITCase extends StormTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ WordCountLocal.main(new String[]{this.textPath, this.resultPath});
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
new file mode 100644
index 0000000..78acfe5
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/wordcount/WordCountLocalNamedITCase.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.wordcount;
+
+import org.apache.flink.storm.util.StormTestBase;
+import org.apache.flink.storm.wordcount.WordCountLocalByName;
+import org.apache.flink.test.testdata.WordCountData;
+
+public class WordCountLocalNamedITCase extends StormTestBase {
+
+ protected String textPath;
+ protected String resultPath;
+
+ @Override
+ protected void preSubmit() throws Exception {
+ this.textPath = this.createTempFile("text.txt", WordCountData.TEXT);
+ this.resultPath = this.getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, this.resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ WordCountLocalByName.main(new String[] { this.textPath, this.resultPath });
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..0b686e5
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties b/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ed2bbcb
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/resources/log4j.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# This file ensures that tests executed from the IDE show log output
+
+log4j.rootLogger=OFF, console
+
+# Log all infos in the given file
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target = System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml b/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..4f56748
--- /dev/null
+++ b/flink-contrib/flink-storm-examples/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/README.md
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/README.md b/flink-contrib/flink-storm/README.md
new file mode 100644
index 0000000..239780c
--- /dev/null
+++ b/flink-contrib/flink-storm/README.md
@@ -0,0 +1,15 @@
+# flink-storm
+
+`flink-storm` is compatibility layer for Apache Storm and allows to embed Spouts or Bolts unmodified within a regular Flink streaming program (`SpoutWrapper` and `BoltWrapper`).
+Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`).
+Only a few minor changes to the original submitting code are required.
+The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
+
+The following Storm features are not (yet/fully) supported by the compatibility layer right now:
+* tuple meta information
+* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
+* for whole Storm topologies the following is not supported by Flink:
+ * direct emit connection pattern
+ * activating/deactivating and rebalancing of topologies
+ * task hooks
+ * metrics
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/pom.xml b/flink-contrib/flink-storm/pom.xml
new file mode 100644
index 0000000..657b974
--- /dev/null
+++ b/flink-contrib/flink-storm/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-contrib-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-storm</artifactId>
+ <name>flink-storm</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ <version>0.9.4</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>logback-classic</artifactId>
+ <groupId>ch.qos.logback</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <versionRange>[2.9,)</versionRange>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
new file mode 100644
index 0000000..5f0ee21
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
+import org.apache.flink.storm.util.StormConfig;
+
+import scala.Some;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
+ * Flink's JobManager instead of Storm's Nimbus.
+ */
+public class FlinkClient {
+
+ /** The client's configuration */
+ private final Map<?,?> conf;
+ /** The jobmanager's host name */
+ private final String jobManagerHost;
+ /** The jobmanager's rpc port */
+ private final int jobManagerPort;
+ /** The user specified timeout in milliseconds */
+ private final String timeout;
+
+ // The following methods are derived from "backtype.storm.utils.NimbusClient"
+
+ /**
+ * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+ * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+ *
+ * @param conf
+ * A configuration.
+ * @param host
+ * The jobmanager's host name.
+ * @param port
+ * The jobmanager's rpc port.
+ */
+ @SuppressWarnings("rawtypes")
+ public FlinkClient(final Map conf, final String host, final int port) {
+ this(conf, host, port, null);
+ }
+
+ /**
+ * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+ * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+ *
+ * @param conf
+ * A configuration.
+ * @param host
+ * The jobmanager's host name.
+ * @param port
+ * The jobmanager's rpc port.
+ * @param timeout
+ * Timeout
+ */
+ @SuppressWarnings("rawtypes")
+ public FlinkClient(final Map conf, final String host, final int port, final Integer timeout) {
+ this.conf = conf;
+ this.jobManagerHost = host;
+ this.jobManagerPort = port;
+ if (timeout != null) {
+ this.timeout = timeout + " ms";
+ } else {
+ this.timeout = null;
+ }
+ }
+
+ /**
+ * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
+ * Config#NIMBUS_THRIFT_PORT} as JobManager address.
+ *
+ * @param conf
+ * Configuration that contains the jobmanager's hostname and port.
+ * @return A configured {@link FlinkClient}.
+ */
+ @SuppressWarnings("rawtypes")
+ public static FlinkClient getConfiguredClient(final Map conf) {
+ final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
+ final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
+ return new FlinkClient(conf, nimbusHost, nimbusPort);
+ }
+
+ /**
+ * Return a reference to itself.
+ * <p/>
+ * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
+ *
+ * @return A reference to itself.
+ */
+ public FlinkClient getClient() {
+ return this;
+ }
+
+ // The following methods are derived from "backtype.storm.generated.Nimubs.Client"
+
+ /**
+ * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+ * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+ */
+ public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
+ }
+
+ /**
+ * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
+ * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
+ */
+ public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
+ topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+
+ if (this.getTopologyJobId(name) != null) {
+ throw new AlreadyAliveException();
+ }
+
+ final File uploadedJarFile = new File(uploadedJarLocation);
+ try {
+ JobWithJars.checkJarFile(uploadedJarFile);
+ } catch (final IOException e) {
+ throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
+ }
+
+ /* set storm configuration */
+ if (this.conf != null) {
+ topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
+ }
+
+ final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
+ jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
+
+ final Configuration configuration = jobGraph.getJobConfiguration();
+ configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
+ configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);
+
+ final Client client;
+ try {
+ client = new Client(configuration);
+ } catch (IOException e) {
+ throw new RuntimeException("Could not establish a connection to the job manager", e);
+ }
+
+ try {
+ ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
+ Lists.newArrayList(uploadedJarFile),
+ this.getClass().getClassLoader());
+ client.runDetached(jobGraph, classLoader);
+ } catch (final ProgramInvocationException e) {
+ throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+ }
+ }
+
+ public void killTopology(final String name) throws NotAliveException {
+ this.killTopologyWithOpts(name, null);
+ }
+
+ public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
+ final JobID jobId = this.getTopologyJobId(name);
+ if (jobId == null) {
+ throw new NotAliveException();
+ }
+
+ try {
+ final ActorRef jobManager = this.getJobManager();
+
+ if (options != null) {
+ try {
+ Thread.sleep(1000 * options.get_wait_secs());
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ final FiniteDuration askTimeout = this.getTimeout();
+ final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
+ try {
+ Await.result(response, askTimeout);
+ } catch (final Exception e) {
+ throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+ + ":" + this.jobManagerPort, e);
+ }
+ }
+
+ // Flink specific additional methods
+
+ /**
+ * Package internal method to get a Flink {@link JobID} from a Storm topology name.
+ *
+ * @param id
+ * The Storm topology name.
+ * @return Flink's internally used {@link JobID}.
+ */
+ JobID getTopologyJobId(final String id) {
+ final Configuration configuration = GlobalConfiguration.getConfiguration();
+ if (this.timeout != null) {
+ configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+ }
+
+ try {
+ final ActorRef jobManager = this.getJobManager();
+
+ final FiniteDuration askTimeout = this.getTimeout();
+ final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
+ new Timeout(askTimeout));
+
+ Object result;
+ try {
+ result = Await.result(response, askTimeout);
+ } catch (final Exception e) {
+ throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
+ }
+
+ if (result instanceof RunningJobsStatus) {
+ final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
+
+ for (final JobStatusMessage status : jobs) {
+ if (status.getJobName().equals(id)) {
+ return status.getJobId();
+ }
+ }
+ } else {
+ throw new RuntimeException("ReqeustRunningJobs requires a response of type "
+ + "RunningJobs. Instead the response is of type " + result.getClass() + ".");
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
+ + ":" + this.jobManagerPort, e);
+ }
+
+ return null;
+ }
+
+ private FiniteDuration getTimeout() {
+ final Configuration configuration = GlobalConfiguration.getConfiguration();
+ if (this.timeout != null) {
+ configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
+ }
+
+ return AkkaUtils.getTimeout(configuration);
+ }
+
+ private ActorRef getJobManager() throws IOException {
+ final Configuration configuration = GlobalConfiguration.getConfiguration();
+
+ ActorSystem actorSystem;
+ try {
+ final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
+ actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
+ systemEndpoint));
+ } catch (final Exception e) {
+ throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
+ }
+
+ return JobManager.getJobManagerActorRef(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
+ actorSystem, AkkaUtils.getLookupTimeout(configuration));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
new file mode 100644
index 0000000..868801b
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkLocalCluster.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import backtype.storm.LocalCluster;
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.RebalanceOptions;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.generated.TopologyInfo;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.StreamingMode;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.storm.util.StormConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
+ */
+public class FlinkLocalCluster {
+
+ /** The log used by this mini cluster */
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);
+
+ /** The flink mini cluster on which to execute the programs */
+ private final FlinkMiniCluster flink;
+
+
+ public FlinkLocalCluster() {
+ this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
+ this.flink.start();
+ }
+
+ public FlinkLocalCluster(FlinkMiniCluster flink) {
+ this.flink = Objects.requireNonNull(flink);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
+ throws Exception {
+ this.submitTopologyWithOpts(topologyName, conf, topology, null);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
+ LOG.info("Running Storm topology on FlinkLocalCluster");
+
+ if(conf != null) {
+ topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
+ }
+
+ JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
+ this.flink.submitJobDetached(jobGraph);
+ }
+
+ public void killTopology(final String topologyName) {
+ this.killTopologyWithOpts(topologyName, null);
+ }
+
+ public void killTopologyWithOpts(final String name, final KillOptions options) {
+ }
+
+ public void activate(final String topologyName) {
+ }
+
+ public void deactivate(final String topologyName) {
+ }
+
+ public void rebalance(final String name, final RebalanceOptions options) {
+ }
+
+ public void shutdown() {
+ flink.stop();
+ }
+
+ public String getTopologyConf(final String id) {
+ return null;
+ }
+
+ public StormTopology getTopology(final String id) {
+ return null;
+ }
+
+ public ClusterSummary getClusterInfo() {
+ return null;
+ }
+
+ public TopologyInfo getTopologyInfo(final String id) {
+ return null;
+ }
+
+ public Map<?, ?> getState() {
+ return null;
+ }
+
+ // ------------------------------------------------------------------------
+ // Access to default local cluster
+ // ------------------------------------------------------------------------
+
+ // A different {@link FlinkLocalCluster} to be used for execution of ITCases
+ private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();
+
+ /**
+ * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
+ * {@link #initialize(LocalClusterFactory)} in advance, a new {@link FlinkLocalCluster} is returned.
+ *
+ * @return a {@link FlinkLocalCluster} to be used for execution
+ */
+ public static FlinkLocalCluster getLocalCluster() {
+ return currentFactory.createLocalCluster();
+ }
+
+ /**
+ * Sets a different factory for FlinkLocalClusters to be used for execution.
+ *
+ * @param clusterFactory
+ * The LocalClusterFactory to create the local clusters for execution.
+ */
+ public static void initialize(LocalClusterFactory clusterFactory) {
+ currentFactory = Objects.requireNonNull(clusterFactory);
+ }
+
+ // ------------------------------------------------------------------------
+ // Cluster factory
+ // ------------------------------------------------------------------------
+
+ /**
+ * A factory that creates local clusters.
+ */
+ public static interface LocalClusterFactory {
+
+ /**
+ * Creates a local flink cluster.
+ * @return A local flink cluster.
+ */
+ FlinkLocalCluster createLocalCluster();
+ }
+
+ /**
+ * A factory that instantiates a FlinkLocalCluster.
+ */
+ public static class DefaultLocalClusterFactory implements LocalClusterFactory {
+
+ @Override
+ public FlinkLocalCluster createLocalCluster() {
+ return new FlinkLocalCluster();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
new file mode 100644
index 0000000..88d2dfe
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkOutputFieldsDeclarer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
+ * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br />
+ * <br />
+ * <strong>CAUTION: Flink does not support direct emit.</strong>
+ */
+final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
+
+ /** The declared output streams and schemas. */
+ final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
+
+ @Override
+ public void declare(final Fields fields) {
+ this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+ *
+ * @throws UnsupportedOperationException
+ * if {@code direct} is {@code true}
+ */
+ @Override
+ public void declare(final boolean direct, final Fields fields) {
+ this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
+ }
+
+ @Override
+ public void declareStream(final String streamId, final Fields fields) {
+ this.declareStream(streamId, false, fields);
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
+ *
+ * @throws UnsupportedOperationException
+ * if {@code direct} is {@code true}
+ */
+ @Override
+ public void declareStream(final String streamId, final boolean direct, final Fields fields) {
+ if (direct) {
+ throw new UnsupportedOperationException("Direct emit is not supported by Flink");
+ }
+
+ this.outputStreams.put(streamId, fields);
+ }
+
+ /**
+ * Returns {@link TypeInformation} for the declared output schema for a specific stream.
+ *
+ * @param streamId
+ * A stream ID.
+ *
+ * @return output type information for the declared output schema of the specified stream; or {@code null} if
+ * {@code streamId == null}
+ *
+ * @throws IllegalArgumentException
+ * If no output schema was declared for the specified stream or if more then 25 attributes got declared.
+ */
+ TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
+ if (streamId == null) {
+ return null;
+ }
+
+ Fields outputSchema = this.outputStreams.get(streamId);
+ if (outputSchema == null) {
+ throw new IllegalArgumentException("Stream with ID '" + streamId
+ + "' was not declared.");
+ }
+
+ Tuple t;
+ final int numberOfAttributes = outputSchema.size();
+
+ if (numberOfAttributes == 1) {
+ return TypeExtractor.getForClass(Object.class);
+ } else if (numberOfAttributes <= 25) {
+ try {
+ t = Tuple.getTupleClass(numberOfAttributes).newInstance();
+ } catch (final InstantiationException e) {
+ throw new RuntimeException(e);
+ } catch (final IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
+ }
+
+ // TODO: declare only key fields as DefaultComparable
+ for (int i = 0; i < numberOfAttributes; ++i) {
+ t.setField(new DefaultComparable(), i);
+ }
+
+ return TypeExtractor.getForObject(t);
+ }
+
+ /**
+ * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
+ * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
+ * Flink cannot use them and will throw an exception.
+ */
+ private static class DefaultComparable implements Comparable<DefaultComparable> {
+
+ public DefaultComparable() {
+ }
+
+ @Override
+ public int compareTo(final DefaultComparable o) {
+ return 0;
+ }
+ }
+
+ /**
+ * Computes the indexes within the declared output schema of the specified stream, for a list of given
+ * field-grouping attributes.
+ *
+ * @param streamId
+ * A stream ID.
+ * @param groupingFields
+ * The names of the key fields.
+ *
+ * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
+ * list
+ */
+ int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
+ final int[] fieldIndexes = new int[groupingFields.size()];
+
+ for (int i = 0; i < fieldIndexes.length; ++i) {
+ fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
+ }
+
+ return fieldIndexes;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
new file mode 100644
index 0000000..9b03c68
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.Utils;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
+ */
+public class FlinkSubmitter {
+ public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
+ *
+ * @param name
+ * the name of the storm.
+ * @param stormConf
+ * the topology-specific configuration. See {@link Config}.
+ * @param topology
+ * the processing to execute.
+ * @param opts
+ * to manipulate the starting of the topology.
+ * @throws AlreadyAliveException
+ * if a topology with this name is already running
+ * @throws InvalidTopologyException
+ * if an invalid topology was submitted
+ */
+ public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
+ final SubmitOptions opts)
+ throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology);
+ }
+
+ /**
+ * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
+ * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
+ *
+ * @param name
+ * the name of the storm.
+ * @param stormConf
+ * the topology-specific configuration. See {@link Config}.
+ * @param topology
+ * the processing to execute.
+ * @throws AlreadyAliveException
+ * if a topology with this name is already running
+ * @throws InvalidTopologyException
+ * if an invalid topology was submitted
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ if (!Utils.isValidConf(stormConf)) {
+ throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
+ }
+
+ final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
+ if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
+ stormConf.put(Config.NIMBUS_HOST,
+ flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+ }
+ if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
+ stormConf.put(Config.NIMBUS_THRIFT_PORT,
+ new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
+ 6123)));
+ }
+
+ final String serConf = JSONValue.toJSONString(stormConf);
+
+ final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
+ try {
+ if (client.getTopologyJobId(name) != null) {
+ throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+ }
+ String localJar = System.getProperty("storm.jar");
+ if (localJar == null) {
+ try {
+ for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+ .getJars()) {
+ // TODO verify that there is only one jar
+ localJar = file.getAbsolutePath();
+ }
+ } catch (final ClassCastException e) {
+ // ignore
+ }
+ }
+
+ logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
+ client.submitTopologyWithOpts(name, localJar, topology);
+ } catch (final InvalidTopologyException e) {
+ logger.warn("Topology submission exception: " + e.get_msg());
+ throw e;
+ } catch (final AlreadyAliveException e) {
+ logger.warn("Topology already alive exception", e);
+ throw e;
+ }
+
+ logger.info("Finished submitting topology: " + name);
+ }
+
+ /**
+ * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
+ * Flink.
+ *
+ * @param name
+ * the name of the storm.
+ * @param stormConf
+ * the topology-specific configuration. See {@link Config}.
+ * @param topology
+ * the processing to execute.
+ * @throws AlreadyAliveException
+ * if a topology with this name is already running
+ * @throws InvalidTopologyException
+ * if an invalid topology was submitted
+ */
+ public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
+ final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology);
+ }
+
+ /**
+ * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+ * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+ * environment.
+ *
+ * @param conf
+ * the topology-specific configuration. See {@link Config}.
+ * @param localJar
+ * file path of the jar file to submit
+ * @return the value of parameter localJar
+ */
+ @SuppressWarnings("rawtypes")
+ public static String submitJar(final Map conf, final String localJar) {
+ return submitJar(localJar);
+ }
+
+ /**
+ * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
+ * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
+ * environment.
+ *
+ * @param localJar
+ * file path of the jar file to submit
+ * @return the value of parameter localJar
+ */
+ public static String submitJar(final String localJar) {
+ if (localJar == null) {
+ throw new RuntimeException(
+ "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
+ "to upload");
+ }
+
+ return localJar;
+ }
+
+ /**
+ * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
+ */
+ public interface FlinkProgressListener {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
new file mode 100644
index 0000000..531d6df
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopology.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.api;
+
+import backtype.storm.generated.StormTopology;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/**
+ * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
+ * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
+ * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
+ * {@link FlinkClient}.
+ */
+public class FlinkTopology extends StreamExecutionEnvironment {
+
+ /** The number of declared tasks for the whole program (ie, sum over all dops) */
+ private int numberOfTasks = 0;
+
+ public FlinkTopology() {
+ // Set default parallelism to 1, to mirror Storm default behavior
+ super.setParallelism(1);
+ }
+
+ /**
+ * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
+ * FlinkClient}.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public JobExecutionResult execute() throws Exception {
+ throw new UnsupportedOperationException(
+ "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+ "instead.");
+ }
+
+ /**
+ * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
+ * FlinkClient}.
+ *
+ * @throws UnsupportedOperationException
+ * at every invocation
+ */
+ @Override
+ public JobExecutionResult execute(final String jobName) throws Exception {
+ throw new UnsupportedOperationException(
+ "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+ "instead.");
+ }
+
+ /**
+ * Increased the number of declared tasks of this program by the given value.
+ *
+ * @param dop
+ * The dop of a new operator that increases the number of overall tasks.
+ */
+ public void increaseNumberOfTasks(final int dop) {
+ assert (dop > 0);
+ this.numberOfTasks += dop;
+ }
+
+ /**
+ * Return the number or required tasks to execute this program.
+ *
+ * @return the number or required tasks to execute this program
+ */
+ public int getNumberOfTasks() {
+ return this.numberOfTasks;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
new file mode 100644
index 0000000..99de0e2
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkTopologyBuilder.java
@@ -0,0 +1,397 @@
+/*
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.api;
+
+import backtype.storm.generated.ComponentCommon;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.IRichStateSpout;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.storm.util.SplitStreamType;
+import org.apache.flink.storm.util.SplitStreamTypeKeySelector;
+import org.apache.flink.storm.util.StormStreamSelector;
+import org.apache.flink.storm.wrappers.BoltWrapper;
+import org.apache.flink.storm.wrappers.SpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
+ * topology. Most methods (except {@link #createTopology()} are copied from the original {@link TopologyBuilder}
+ * implementation to ensure equal behavior.<br />
+ * <br />
+ * <strong>CAUTION: {@link IRichStateSpout StateSpout}s are currently not supported.</strong>
+ */
+public class FlinkTopologyBuilder {
+
+ /** A Storm {@link TopologyBuilder} to build a real Storm topology */
+ private final TopologyBuilder stormBuilder = new TopologyBuilder();
+ /** All user spouts by their ID */
+ private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
+ /** All user bolts by their ID */
+ private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
+ /** All declared streams and output schemas by operator ID */
+ private final HashMap<String, HashMap<String, Fields>> outputStreams = new HashMap<String, HashMap<String, Fields>>();
+ /** All spouts&bolts declarers by their ID */
+ private final HashMap<String, FlinkOutputFieldsDeclarer> declarers = new HashMap<String, FlinkOutputFieldsDeclarer>();
+ // needs to be a class member for internal testing purpose
+ private StormTopology stormTopology;
+
+
+ /**
+ * Creates a Flink program that uses the specified spouts and bolts.
+ */
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public FlinkTopology createTopology() {
+ this.stormTopology = this.stormBuilder.createTopology();
+
+ final FlinkTopology env = new FlinkTopology();
+ env.setParallelism(1);
+
+ final HashMap<String, HashMap<String, DataStream>> availableInputs = new HashMap<String, HashMap<String, DataStream>>();
+
+ for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
+ final String spoutId = spout.getKey();
+ final IRichSpout userSpout = spout.getValue();
+
+ final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ userSpout.declareOutputFields(declarer);
+ final HashMap<String,Fields> sourceStreams = declarer.outputStreams;
+ this.outputStreams.put(spoutId, sourceStreams);
+ declarers.put(spoutId, declarer);
+
+ final SpoutWrapper spoutWrapper = new SpoutWrapper(userSpout);
+ spoutWrapper.setStormTopology(stormTopology);
+
+ DataStreamSource source;
+ final HashMap<String, DataStream> outputStreams = new HashMap<String, DataStream>();
+ if (sourceStreams.size() == 1) {
+ final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
+ source = env.addSource(spoutWrapper, spoutId,
+ declarer.getOutputType(outputStreamId));
+ outputStreams.put(outputStreamId, source);
+ } else {
+ source = env.addSource(spoutWrapper, spoutId,
+ TypeExtractor.getForClass(SplitStreamType.class));
+ SplitStream splitSource = source.split(new StormStreamSelector());
+
+ for (String streamId : sourceStreams.keySet()) {
+ outputStreams.put(streamId, splitSource.select(streamId));
+ }
+ }
+ availableInputs.put(spoutId, outputStreams);
+
+ int dop = 1;
+ final ComponentCommon common = stormTopology.get_spouts().get(spoutId).get_common();
+ if (common.is_set_parallelism_hint()) {
+ dop = common.get_parallelism_hint();
+ source.setParallelism(dop);
+ } else {
+ common.set_parallelism_hint(1);
+ }
+ env.increaseNumberOfTasks(dop);
+ }
+
+ final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
+ unprocessedBolts.putAll(this.bolts);
+
+ final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
+ new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
+
+ /* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
+ * its producer
+ * ->thus, we might need to repeat multiple times
+ */
+ boolean makeProgress = true;
+ while (unprocessedBolts.size() > 0) {
+ if (!makeProgress) {
+ throw new RuntimeException(
+ "Unable to build Topology. Could not connect the following bolts: "
+ + unprocessedBolts.keySet());
+ }
+ makeProgress = false;
+
+ final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
+ while (boltsIterator.hasNext()) {
+
+ final Entry<String, IRichBolt> bolt = boltsIterator.next();
+ final String boltId = bolt.getKey();
+ final IRichBolt userBolt = bolt.getValue();
+
+ final ComponentCommon common = stormTopology.get_bolts().get(boltId).get_common();
+
+ Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
+ if (unprocessedInputs == null) {
+ unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
+ unprocessedInputs.addAll(common.get_inputs().entrySet());
+ unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
+ }
+
+ // connect each available producer to the current bolt
+ final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
+ while (inputStreamsIterator.hasNext()) {
+
+ final Entry<GlobalStreamId, Grouping> stormInputStream = inputStreamsIterator.next();
+ final String producerId = stormInputStream.getKey().get_componentId();
+ final String inputStreamId = stormInputStream.getKey().get_streamId();
+
+ final HashMap<String, DataStream> producer = availableInputs.get(producerId);
+ if (producer != null) {
+ makeProgress = true;
+
+ DataStream inputStream = producer.get(inputStreamId);
+ if (inputStream != null) {
+ final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
+ userBolt.declareOutputFields(declarer);
+ final HashMap<String, Fields> boltOutputStreams = declarer.outputStreams;
+ this.outputStreams.put(boltId, boltOutputStreams);
+ this.declarers.put(boltId, declarer);
+
+ // if producer was processed already
+ final Grouping grouping = stormInputStream.getValue();
+ if (grouping.is_set_shuffle()) {
+ // Storm uses a round-robin shuffle strategy
+ inputStream = inputStream.rebalance();
+ } else if (grouping.is_set_fields()) {
+ // global grouping is emulated in Storm via an empty fields grouping list
+ final List<String> fields = grouping.get_fields();
+ if (fields.size() > 0) {
+ FlinkOutputFieldsDeclarer prodDeclarer = this.declarers.get(producerId);
+ if (producer.size() == 1) {
+ inputStream = inputStream.keyBy(prodDeclarer
+ .getGroupingFieldIndexes(inputStreamId,
+ grouping.get_fields()));
+ } else {
+ inputStream = inputStream
+ .keyBy(new SplitStreamTypeKeySelector(
+ prodDeclarer.getGroupingFieldIndexes(
+ inputStreamId,
+ grouping.get_fields())));
+ }
+ } else {
+ inputStream = inputStream.global();
+ }
+ } else if (grouping.is_set_all()) {
+ inputStream = inputStream.broadcast();
+ } else if (!grouping.is_set_local_or_shuffle()) {
+ throw new UnsupportedOperationException(
+ "Flink only supports (local-or-)shuffle, fields, all, and global grouping");
+ }
+
+ final SingleOutputStreamOperator outputStream;
+ final BoltWrapper boltWrapper;
+ if (boltOutputStreams.size() < 2) { // single output stream or sink
+ String outputStreamId = null;
+ if (boltOutputStreams.size() == 1) {
+ outputStreamId = (String) boltOutputStreams.keySet().toArray()[0];
+ }
+ final TypeInformation<?> outType = declarer
+ .getOutputType(outputStreamId);
+
+ boltWrapper = new BoltWrapper(userBolt, this.outputStreams
+ .get(producerId).get(inputStreamId));
+ outputStream = inputStream.transform(boltId, outType, boltWrapper);
+
+ if (outType != null) {
+ // only for non-sink nodes
+ final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+ op.put(outputStreamId, outputStream);
+ availableInputs.put(boltId, op);
+ }
+ } else {
+ final TypeInformation<?> outType = TypeExtractor
+ .getForClass(SplitStreamType.class);
+
+ boltWrapper = new BoltWrapper(userBolt, this.outputStreams.get(producerId).get(inputStreamId));
+ outputStream = inputStream.transform(boltId, outType, boltWrapper);
+
+ final SplitStream splitStreams = outputStream
+ .split(new StormStreamSelector());
+
+ final HashMap<String, DataStream> op = new HashMap<String, DataStream>();
+ for (String outputStreamId : boltOutputStreams.keySet()) {
+ op.put(outputStreamId, splitStreams.select(outputStreamId));
+ }
+ availableInputs.put(boltId, op);
+ }
+ boltWrapper.setStormTopology(stormTopology);
+
+ int dop = 1;
+ if (common.is_set_parallelism_hint()) {
+ dop = common.get_parallelism_hint();
+ outputStream.setParallelism(dop);
+ } else {
+ common.set_parallelism_hint(1);
+ }
+ env.increaseNumberOfTasks(dop);
+
+ inputStreamsIterator.remove();
+ } else {
+ throw new RuntimeException("Cannot connect '" + boltId + "' to '"
+ + producerId + "'. Stream '" + inputStreamId + "' not found.");
+ }
+ }
+ }
+
+ if (unprocessedInputs.size() == 0) {
+ // all inputs are connected; processing bolt completed
+ boltsIterator.remove();
+ }
+ }
+ }
+ return env;
+ }
+
+ /**
+ * Define a new bolt in this topology with parallelism of just one thread.
+ *
+ * @param id
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
+ * @param bolt
+ * the bolt
+ * @return use the returned object to declare the inputs to this component
+ */
+ public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
+ return this.setBolt(id, bolt, null);
+ }
+
+ /**
+ * Define a new bolt in this topology with the specified amount of parallelism.
+ *
+ * @param id
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
+ * @param bolt
+ * the bolt
+ * @param parallelism_hint
+ * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+ * process somewhere around the cluster.
+ * @return use the returned object to declare the inputs to this component
+ */
+ public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
+ final BoltDeclarer declarer = this.stormBuilder.setBolt(id, bolt, parallelism_hint);
+ this.bolts.put(id, bolt);
+ return declarer;
+ }
+
+ /**
+ * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+ * kind
+ * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+ * achieve proper reliability in the topology.
+ *
+ * @param id
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
+ * @param bolt
+ * the basic bolt
+ * @return use the returned object to declare the inputs to this component
+ */
+ public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
+ return this.setBolt(id, bolt, null);
+ }
+
+ /**
+ * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+ * kind
+ * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
+ * achieve proper reliability in the topology.
+ *
+ * @param id
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
+ * @param bolt
+ * the basic bolt
+ * @param parallelism_hint
+ * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+ * process somwehere around the cluster.
+ * @return use the returned object to declare the inputs to this component
+ */
+ public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
+ return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
+ }
+
+ /**
+ * Define a new spout in this topology.
+ *
+ * @param id
+ * the id of this component. This id is referenced by other components that want to consume this spout's
+ * outputs.
+ * @param spout
+ * the spout
+ */
+ public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
+ return this.setSpout(id, spout, null);
+ }
+
+ /**
+ * Define a new spout in this topology with the specified parallelism. If the spout declares itself as
+ * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
+ *
+ * @param id
+ * the id of this component. This id is referenced by other components that want to consume this spout's
+ * outputs.
+ * @param parallelism_hint
+ * the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
+ * process somwehere around the cluster.
+ * @param spout
+ * the spout
+ */
+ public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
+ final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
+ this.spouts.put(id, spout);
+ return declarer;
+ }
+
+ // TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
+ /* not implemented by Storm 0.9.4
+ * public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
+ * this.stormBuilder.setStateSpout(id, stateSpout);
+ * }
+ * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
+ * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
+ * }
+ */
+
+ // for internal testing purpose only
+ StormTopology getStormTopology() {
+ return this.stormTopology;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
new file mode 100644
index 0000000..99c2583
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/FiniteSpout.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.storm.util;
+
+import backtype.storm.topology.IRichSpout;
+
+/**
+ * This interface represents a spout that emits a finite number of records. Common spouts emit infinite streams by
+ * default. To change this behavior and take advantage of Flink's finite-source capabilities, the spout should implement
+ * this interface.
+ */
+public interface FiniteSpout extends IRichSpout {
+
+ /**
+ * When returns true, the spout has reached the end of the stream.
+ *
+ * @return true, if the spout's stream reached its end, false otherwise
+ */
+ public boolean reachedEnd();
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java
new file mode 100644
index 0000000..1fb5e02
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamMapper.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+
+/**
+ * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get
+ * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and
+ * {@link SplitStream#select(String...) .select(...)}).
+ *
+ * @param <T>
+ */
+public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> {
+ private static final long serialVersionUID = 3550359150160908564L;
+
+ @Override
+ public T map(SplitStreamType<T> value) throws Exception {
+ return value.value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java
new file mode 100644
index 0000000..a4b5f8e
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamType.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * Used by {@link org.apache.flink.storm.wrappers.AbstractStormCollector AbstractStormCollector} to wrap
+ * output tuples if multiple output streams are declared. For this case, the Flink output data stream must be split via
+ * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} using
+ * {@link StormStreamSelector}.
+ */
+public class SplitStreamType<T> {
+
+ /** The stream ID this tuple belongs to. */
+ public String streamId;
+ /** The actual data value. */
+ public T value;
+
+ @Override
+ public String toString() {
+ return "<sid:" + this.streamId + ",v:" + this.value + ">";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SplitStreamType<?> other = (SplitStreamType<?>) o;
+
+ return this.streamId.equals(other.streamId) && this.value.equals(other.value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
new file mode 100644
index 0000000..44c693c
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/SplitStreamTypeKeySelector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
+
+/**
+ * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
+ * {@link StormStreamSelector} from a Spout or Bolt that declares multiple output streams.
+ *
+ * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
+ * {@link ArrayKeySelector} on it.
+ */
+public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
+ private static final long serialVersionUID = 4672434660037669254L;
+
+ private final ArrayKeySelector<Tuple> selector;
+
+ public SplitStreamTypeKeySelector(int... fields) {
+ this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
+ }
+
+ @Override
+ public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
+ return selector.getKey(value.value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
new file mode 100644
index 0000000..6550990
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
+
+import backtype.storm.Config;
+
+/**
+ * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
+ * object) for embedded Spouts and Bolts.
+ */
+@SuppressWarnings("rawtypes")
+public final class StormConfig extends GlobalJobParameters implements Map {
+ private static final long serialVersionUID = 8019519109673698490L;
+
+ /** Contains the actual configuration that is provided to Spouts and Bolts. */
+ private final Map config = new HashMap();
+
+ /**
+ * Creates an empty configuration.
+ */
+ public StormConfig() {
+ }
+
+ /**
+ * Creates an configuration with initial values provided by the given {@code Map}.
+ *
+ * @param config
+ * Initial values for this configuration.
+ */
+ @SuppressWarnings("unchecked")
+ public StormConfig(Map config) {
+ this.config.putAll(config);
+ }
+
+
+ @Override
+ public int size() {
+ return this.config.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return this.config.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return this.config.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return this.config.containsValue(value);
+ }
+
+ @Override
+ public Object get(Object key) {
+ return this.config.get(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Object put(Object key, Object value) {
+ return this.config.put(key, value);
+ }
+
+ @Override
+ public Object remove(Object key) {
+ return this.config.remove(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void putAll(Map m) {
+ this.config.putAll(m);
+ }
+
+ @Override
+ public void clear() {
+ this.config.clear();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Set<Object> keySet() {
+ return this.config.keySet();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Collection<Object> values() {
+ return this.config.values();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Set<java.util.Map.Entry<Object, Object>> entrySet() {
+ return this.config.entrySet();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
new file mode 100644
index 0000000..d9f4178
--- /dev/null
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/util/StormStreamSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.storm.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.flink.storm.api.FlinkTopologyBuilder;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
+/**
+ * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
+ */
+public final class StormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
+ private static final long serialVersionUID = 2553423379715401023L;
+
+ /** internal cache to avoid short living ArrayList objects. */
+ private final HashMap<String, List<String>> streams = new HashMap<String, List<String>>();
+
+ @Override
+ public Iterable<String> select(SplitStreamType<T> value) {
+ String sid = value.streamId;
+ List<String> streamId = this.streams.get(sid);
+ if (streamId == null) {
+ streamId = new ArrayList<String>(1);
+ streamId.add(sid);
+ this.streams.put(sid, streamId);
+ }
+ return streamId;
+ }
+
+}
\ No newline at end of file