You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:02 UTC
[12/27] flink git commit: [storm-compat] Storm compatibility code
cleanup
[storm-compat] Storm compatibility code cleanup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e497a831
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e497a831
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e497a831
Branch: refs/heads/master
Commit: e497a831fab3c01ff8dc5992940b09e6427fa472
Parents: 9ff3cf0
Author: szape <ne...@gmail.com>
Authored: Thu May 21 10:15:20 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Jun 14 23:00:00 2015 +0200
----------------------------------------------------------------------
.../stormcompatibility/api/FlinkClient.java | 269 +++++++++----------
.../api/FlinkLocalCluster.java | 87 +++---
.../api/FlinkOutputFieldsDeclarer.java | 115 ++++----
.../stormcompatibility/api/FlinkSubmitter.java | 216 ++++++---------
.../stormcompatibility/api/FlinkTopology.java | 82 +++---
.../api/FlinkTopologyBuilder.java | 262 +++++++++---------
.../api/FlinkTopologyContext.java | 114 ++++----
.../wrappers/AbstractStormSpoutWrapper.java | 76 +++---
.../wrappers/FlinkDummyRichFunction.java | 25 +-
.../wrappers/StormBoltWrapper.java | 87 +++---
.../wrappers/StormCollector.java | 123 ++++-----
.../wrappers/StormFiniteSpoutWrapper.java | 91 +++----
.../wrappers/StormOutputFieldsDeclarer.java | 37 ++-
.../wrappers/StormSpoutWrapper.java | 47 ++--
.../stormcompatibility/wrappers/StormTuple.java | 127 +++++----
.../wrappers/StormWrapperSetupHelper.java | 95 ++++---
.../api/FlinkOutputFieldsDeclarerTest.java | 111 ++++----
.../api/FlinkTopologyContextTest.java | 34 ++-
.../api/FlinkTopologyTest.java | 25 +-
.../stormcompatibility/util/AbstractTest.java | 13 +-
.../wrappers/FiniteTestSpout.java | 43 ++-
.../wrappers/FlinkDummyRichFunctionTest.java | 15 +-
.../wrappers/StormBoltWrapperTest.java | 127 ++++-----
.../wrappers/StormCollectorTest.java | 79 +++---
.../wrappers/StormFiniteSpoutWrapperTest.java | 64 +++--
.../wrappers/StormOutputFieldsDeclarerTest.java | 38 ++-
.../wrappers/StormSpoutWrapperTest.java | 30 +--
.../wrappers/StormTupleTest.java | 190 +++++++------
.../wrappers/StormWrapperSetupHelperTest.java | 83 +++---
.../wrappers/TestCollector.java | 17 +-
.../src/assembly/word-count-storm.xml | 94 +++----
.../util/AbstractStormBoltSink.java | 83 ++++++
.../util/AbstractStormSpout.java | 70 +++++
.../util/OutputFormatter.java | 25 ++
.../util/StormBoltFileSink.java | 72 +++++
.../util/StormBoltPrintSink.java | 45 ++++
.../wordcount/BoltTokenizerWordCount.java | 79 +++---
.../wordcount/SpoutSourceWordCount.java | 96 ++++---
.../wordcount/StormWordCountLocal.java | 43 ++-
.../wordcount/StormWordCountRemoteByClient.java | 55 ++--
.../StormWordCountRemoteBySubmitter.java | 58 ++--
.../wordcount/WordCountTopology.java | 76 +++---
.../stormoperators/AbstractStormBoltSink.java | 80 ------
.../stormoperators/AbstractStormSpout.java | 75 ------
.../stormoperators/StormBoltCounter.java | 49 ++--
.../stormoperators/StormBoltFileSink.java | 77 ------
.../stormoperators/StormBoltPrintSink.java | 43 ---
.../stormoperators/StormBoltTokenizer.java | 38 ++-
.../stormoperators/StormFileSpout.java | 43 ++-
.../stormoperators/StormInMemorySpout.java | 19 +-
.../WordCountOutputFormatter.java | 41 +++
.../api/FlinkTestCluster.java | 63 ++---
.../wordcount/BoltTokenizerWordCountITCase.java | 18 +-
.../wordcount/SpoutSourceWordCountITCase.java | 18 +-
.../wordcount/StormWordCountLocalITCase.java | 18 +-
55 files changed, 1919 insertions(+), 2181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 4efe02f..0f11d63 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -15,16 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+package org.apache.flink.stormcompatibility.api;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
@@ -40,282 +45,258 @@ import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-
-
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
/**
* {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
* Flink's JobManager instead of Storm's Nimbus.
*/
public class FlinkClient {
- /**
- * The jobmanager's host name.
- */
+
+ //The jobmanager's host name
private final String jobManagerHost;
- /**
- * The jobmanager's rpc port.
- */
+ //The jobmanager's rpc port
private final int jobManagerPort;
- /**
- * The user specified timeout in milliseconds.
- */
+ //The user specified timeout in milliseconds
private final String timeout;
-
-
-
- /*
- * The following methods are derived from "backtype.storm.utils.NimbusClient"
- */
-
+
+ // The following methods are derived from "backtype.storm.utils.NimbusClient"
+
/**
- * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for
- * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
- *
+ * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+ * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+ *
* @param conf
- * A configuration.
+ * A configuration.
* @param host
- * The jobmanager's host name.
+ * The jobmanager's host name.
* @param port
- * The jobmanager's rpc port.
+ * The jobmanager's rpc port.
*/
- public FlinkClient(final Map<?, ?> conf, final String host, final int port) {
- this(conf, host, port, null);
+ public FlinkClient(final String host, final int port) {
+ this(host, port, null);
}
-
+
/**
- * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for
- * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
- *
+ * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+ * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+ *
* @param conf
- * A configuration.
+ * A configuration.
* @param host
- * The jobmanager's host name.
+ * The jobmanager's host name.
* @param port
- * The jobmanager's rpc port.
+ * The jobmanager's rpc port.
* @param timeout
+ * Timeout
*/
- public FlinkClient(final Map<?, ?> conf, final String host, final int port, final Integer timeout) {
+ public FlinkClient(final String host, final int port, final Integer timeout) {
this.jobManagerHost = host;
this.jobManagerPort = port;
- if(timeout != null) {
+ if (timeout != null) {
this.timeout = timeout + " ms";
} else {
this.timeout = null;
}
}
-
-
-
+
/**
- * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and
- * {@link Config#NIMBUS_THRIFT_PORT} as JobManager address.
- *
+ * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
+ * Config#NIMBUS_THRIFT_PORT} as JobManager address.
+ *
* @param conf
- * Configuration that contains the jobmanager's hostname and port.
- *
+ * Configuration that contains the jobmanager's hostname and port.
* @return A configured {@link FlinkClient}.
*/
- public static FlinkClient getConfiguredClient(@SuppressWarnings("rawtypes") final Map conf) {
- final String nimbusHost = (String)conf.get(Config.NIMBUS_HOST);
- final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
- return new FlinkClient(conf, nimbusHost, nimbusPort);
+ @SuppressWarnings("rawtypes")
+ public static FlinkClient getConfiguredClient(final Map conf) {
+ final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
+ final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
+ return new FlinkClient(nimbusHost, nimbusPort);
}
-
-
-
+
/**
* Return a reference to itself.
- *
+ * <p/>
* {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
- *
+ *
* @return A reference to itself.
*/
+ @SuppressWarnings("unused")
public FlinkClient getClient() {
return this;
}
-
+
public void close() {/* nothing to do */}
-
-
- /*
- * The following methods are derived from "backtype.storm.generated.Nimubs.Client"
- */
-
+
+ // The following methods are derived from "backtype.storm.generated.Nimubs.Client"
+
/**
* Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
* uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
*/
- public void submitTopology(final String name, final String uploadedJarLocation, final String jsonConf, final FlinkTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
- this.submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, null);
+ public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
}
-
+
/**
* Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
* uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
*/
- public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final String jsonConf, final FlinkTopology topology, final SubmitOptions options)
- throws AlreadyAliveException, InvalidTopologyException {
-
- if(this.getTopologyJobId(name) != null) {
+ public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
+ topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+
+ if (this.getTopologyJobId(name) != null) {
throw new AlreadyAliveException();
}
-
+
final File uploadedJarFile = new File(uploadedJarLocation);
try {
JobWithJars.checkJarFile(uploadedJarFile);
- } catch(final IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
}
-
+
final List<File> jarFiles = new ArrayList<File>();
jarFiles.add(uploadedJarFile);
-
+
final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
-
+
final Configuration configuration = jobGraph.getJobConfiguration();
-
+
final Client client;
try {
client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
- JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
- } catch(final UnknownHostException e) {
+ JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
+ } catch (final UnknownHostException e) {
throw new RuntimeException("Cannot execute job due to UnknownHostException", e);
}
-
+
try {
client.run(jobGraph, false);
- } catch(final ProgramInvocationException e) {
+ } catch (final ProgramInvocationException e) {
throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
}
}
-
+
public void killTopology(final String name) throws NotAliveException {
this.killTopologyWithOpts(name, null);
}
-
+
public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
final JobID jobId = this.getTopologyJobId(name);
- if(jobId == null) {
+ if (jobId == null) {
throw new NotAliveException();
}
-
+
try {
final ActorRef jobManager = this.getJobManager();
-
- if(options != null) {
+
+ if (options != null) {
try {
Thread.sleep(1000 * options.get_wait_secs());
- } catch(final InterruptedException e) {
+ } catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}
-
+
final FiniteDuration askTimeout = this.getTimeout();
final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
try {
Await.result(response, askTimeout);
- } catch(final Exception e) {
- throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed.", e);
+ } catch (final Exception e) {
+ throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
}
- } catch(final IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
- + ":" + this.jobManagerPort, e);
+ + ":" + this.jobManagerPort, e);
}
}
-
+
/**
* Package internal method to get a Flink {@link JobID} from a Storm topology name.
- *
+ *
* @param id
- * The Storm topology name.
- *
+ * The Storm topology name.
* @return Flink's internally used {@link JobID}.
*/
JobID getTopologyJobId(final String id) {
final Configuration configuration = GlobalConfiguration.getConfiguration();
- if(this.timeout != null) {
+ if (this.timeout != null) {
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
}
-
+
try {
final ActorRef jobManager = this.getJobManager();
-
+
final FiniteDuration askTimeout = this.getTimeout();
final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
- new Timeout(askTimeout));
-
+ new Timeout(askTimeout));
+
Object result;
try {
result = Await.result(response, askTimeout);
- } catch(final Exception e) {
- throw new RuntimeException("Could not retrieve running jobs from the JobManager.", e);
+ } catch (final Exception e) {
+ throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
}
-
- if(result instanceof RunningJobsStatus) {
- final List<JobStatusMessage> jobs = ((RunningJobsStatus)result).getStatusMessages();
-
- for(final JobStatusMessage status : jobs) {
- if(status.getJobName().equals(id)) {
+
+ if (result instanceof RunningJobsStatus) {
+ final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
+
+ for (final JobStatusMessage status : jobs) {
+ if (status.getJobName().equals(id)) {
return status.getJobId();
}
}
} else {
throw new RuntimeException("ReqeustRunningJobs requires a response of type "
- + "RunningJobs. Instead the response is of type " + result.getClass() + ".");
+ + "RunningJobs. Instead the response is of type " + result.getClass() + ".");
}
- } catch(final IOException e) {
+ } catch (final IOException e) {
throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
- + ":" + this.jobManagerPort, e);
+ + ":" + this.jobManagerPort, e);
}
-
+
return null;
}
-
+
private FiniteDuration getTimeout() {
final Configuration configuration = GlobalConfiguration.getConfiguration();
- if(this.timeout != null) {
+ if (this.timeout != null) {
configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
}
-
+
return AkkaUtils.getTimeout(configuration);
}
-
+
private ActorRef getJobManager() throws IOException {
final Configuration configuration = GlobalConfiguration.getConfiguration();
-
+
ActorSystem actorSystem;
try {
- final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", new Integer(0));
+ final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
- systemEndpoint));
- } catch(final Exception e) {
+ systemEndpoint));
+ } catch (final Exception e) {
throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
}
-
+
return JobManager.getJobManagerRemoteReference(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
- actorSystem, AkkaUtils.getLookupTimeout(configuration));
+ actorSystem, AkkaUtils.getLookupTimeout(configuration));
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
index fb88570..7160bc4 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-
-import java.util.Map;
-import org.apache.flink.streaming.util.ClusterUtil;
+package org.apache.flink.stormcompatibility.api;
import backtype.storm.LocalCluster;
import backtype.storm.generated.ClusterSummary;
@@ -28,106 +25,96 @@ import backtype.storm.generated.RebalanceOptions;
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
+import org.apache.flink.streaming.util.ClusterUtil;
-
-
-
+import java.util.Map;
/**
* {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
*/
public class FlinkLocalCluster {
-
+
public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
- throws Exception {
+ throws Exception {
this.submitTopologyWithOpts(topologyName, conf, topology, null);
}
-
- public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology, final SubmitOptions submitOpts)
- throws Exception {
+
+ public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
+ final SubmitOptions submitOpts) throws Exception {
ClusterUtil
- .startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
+ .startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
}
-
+
public void killTopology(final String topologyName) {
this.killTopologyWithOpts(topologyName, null);
}
-
+
public void killTopologyWithOpts(final String name, final KillOptions options) {
- // TODO Auto-generated method stub
}
-
+
public void activate(final String topologyName) {
- // TODO Auto-generated method stub
}
-
+
public void deactivate(final String topologyName) {
- // TODO Auto-generated method stub
}
-
+
public void rebalance(final String name, final RebalanceOptions options) {
- // TODO Auto-generated method stub
}
-
+
public void shutdown() {
ClusterUtil.stopOnMiniCluster();
}
-
+
+ @SuppressWarnings("unused")
public String getTopologyConf(final String id) {
- // TODO Auto-generated method stub
return null;
}
-
+
+ @SuppressWarnings("unused")
public StormTopology getTopology(final String id) {
- // TODO Auto-generated method stub
return null;
}
-
+
+ @SuppressWarnings("unused")
public ClusterSummary getClusterInfo() {
- // TODO Auto-generated method stub
return null;
}
-
+
+ @SuppressWarnings("unused")
public TopologyInfo getTopologyInfo(final String id) {
- // TODO Auto-generated method stub
return null;
}
-
+
+ @SuppressWarnings("unused")
public Map<?, ?> getState() {
- // TODO Auto-generated method stub
return null;
}
-
-
-
- // the following is used to set a different execution environment for ITCases
- /**
- * A different {@link FlinkLocalCluster} to be used for execution.
- */
+
+ // A different {@link FlinkLocalCluster} to be used for execution of ITCases
private static FlinkLocalCluster currentCluster = null;
-
+
/**
- * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
- * {@link #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
- *
+ * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by {@link
+ * #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
+ *
* @return a {@link FlinkLocalCluster} to be used for execution
*/
public static FlinkLocalCluster getLocalCluster() {
- if(currentCluster == null) {
+ if (currentCluster == null) {
currentCluster = new FlinkLocalCluster();
}
-
+
return currentCluster;
}
-
+
/**
* Sets a different {@link FlinkLocalCluster} to be used for execution.
- *
+ *
* @param cluster
- * the {@link FlinkLocalCluster} to be used for execution
+ * the {@link FlinkLocalCluster} to be used for execution
*/
public static void initialize(final FlinkLocalCluster cluster) {
currentCluster = cluster;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
index d371fc0..206db28 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -14,158 +14,153 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-
-import java.util.List;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+package org.apache.flink.stormcompatibility.api;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-
-
+import java.util.List;
/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or
- * {@link IRichBolt bolt}.<br />
- * <br />
- * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore, direct emit is not
- * supported.</strong>
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or {@link
+ * IRichBolt bolt}.<br /> <br /> <strong>CAUTION: Currently, Flink does only support the default output stream.
+ * Furthermore, direct emit is not supported.</strong>
*/
final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
private Fields outputSchema;
-
+
@Override
public void declare(final Fields fields) {
this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
}
-
+
/**
* {@inheritDoc}
- *
+ * <p/>
* Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
- *
+ *
* @throws UnsupportedOperationException
- * if {@code direct} is {@code true}
+ * if {@code direct} is {@code true}
*/
@Override
public void declare(final boolean direct, final Fields fields) {
this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
}
-
+
/**
* {@inheritDoc}
- *
+ * <p/>
* Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
* {@link Utils#DEFAULT_STREAM_ID}.
- *
+ *
* @throws UnsupportedOperationException
- * if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
+ * if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
*/
@Override
public void declareStream(final String streamId, final Fields fields) {
this.declareStream(streamId, false, fields);
}
-
+
/**
* {@inheritDoc}
- *
+ * <p/>
* Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
* {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
* must be {@code false}.
- *
+ *
* @throws UnsupportedOperationException
- * if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
+ * if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
*/
@Override
public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
+ if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
}
- if(direct) {
+ if (direct) {
throw new UnsupportedOperationException("Direct emit is not supported by Flink");
}
-
+
this.outputSchema = fields;
}
-
+
/**
* Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
* {@code null} is returned.
- *
+ *
* @return output type information for the declared output schema; or {@code null} if no output schema was declared
- *
* @throws IllegalArgumentException
- * if more then 25 attributes are declared
+ * if more then 25 attributes are declared
*/
public TypeInformation<?> getOutputType() throws IllegalArgumentException {
- if((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
+ if ((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
return null;
}
-
+
Tuple t;
final int numberOfAttributes = this.outputSchema.size();
-
- if(numberOfAttributes == 1) {
+
+ if (numberOfAttributes == 1) {
return TypeExtractor.getForClass(Object.class);
- } else if(numberOfAttributes <= 25) {
+ } else if (numberOfAttributes <= 25) {
try {
t = Tuple.getTupleClass(numberOfAttributes).newInstance();
- } catch(final InstantiationException e) {
+ } catch (final InstantiationException e) {
throw new RuntimeException(e);
- } catch(final IllegalAccessException e) {
+ } catch (final IllegalAccessException e) {
throw new RuntimeException(e);
}
} else {
- throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes.");
+ throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
}
-
+
// TODO: declare only key fields as DefaultComparable
- for(int i = 0; i < numberOfAttributes; ++i) {
+ for (int i = 0; i < numberOfAttributes; ++i) {
t.setField(new DefaultComparable(), i);
}
-
+
return TypeExtractor.getForObject(t);
}
-
+
/**
- * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct
- * {@link TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not
- * comparable, Flink cannot use them and will throw an exception.
- *
+ * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
+ * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
+ * Flink cannot use them and will throw an exception.
+ *
* @author mjsax
*/
private static class DefaultComparable implements Comparable<DefaultComparable> {
-
- public DefaultComparable() {}
-
+
+ public DefaultComparable() {
+ }
+
+ @SuppressWarnings("NullableProblems")
@Override
public int compareTo(final DefaultComparable o) {
return 0;
}
}
-
+
/**
* Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
- *
+ *
* @return array of {@code int}s that contains the index without the output schema for each attribute in the given
- * list
+ * list
*/
public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
final int[] fieldIndexes = new int[groupingFields.size()];
-
- for(int i = 0; i < fieldIndexes.length; ++i) {
+
+ for (int i = 0; i < fieldIndexes.length; ++i) {
fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
}
-
+
return fieldIndexes;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index efd0c46..1f37bf8 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -14,11 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-import java.io.File;
-import java.util.Map;
+package org.apache.flink.stormcompatibility.api;
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.Utils;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.configuration.ConfigConstants;
@@ -28,218 +32,176 @@ import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.Utils;
-
-
-
-
+import java.io.File;
+import java.util.Map;
/**
* {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
*/
public class FlinkSubmitter {
public static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
- *
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- */
- public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, (SubmitOptions)null, (FlinkProgressListener)null);
- }
-
+
/**
* Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
- *
+ *
* @param name
- * the name of the storm.
+ * the name of the storm.
* @param stormConf
- * the topology-specific configuration. See {@link Config}.
+ * the topology-specific configuration. See {@link Config}.
* @param topology
- * the processing to execute.
+ * the processing to execute.
* @param opts
- * to manipulate the starting of the topology.
+ * to manipulate the starting of the topology.
* @throws AlreadyAliveException
- * if a topology with this name is already running
+ * if a topology with this name is already running
* @throws InvalidTopologyException
- * if an invalid topology was submitted
+ * if an invalid topology was submitted
*/
- public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology, final SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, opts, (FlinkProgressListener)null);
+ @SuppressWarnings("unused")
+ public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
+ final SubmitOptions opts)
+ throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology);
}
-
+
/**
- * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given
- * {@link FlinkProgressListener} is ignored because progress bars are not supported by Flink.
- *
- *
+ * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
+ * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
+ *
* @param name
- * the name of the storm.
+ * the name of the storm.
* @param stormConf
- * the topology-specific configuration. See {@link Config}.
+ * the topology-specific configuration. See {@link Config}.
* @param topology
- * the processing to execute.
+ * the processing to execute.
* @param opts
- * to manipulate the starting of the topology
+ * to manipulate the starting of the topology
* @param progressListener
- * to track the progress of the jar upload process
+ * to track the progress of the jar upload process
* @throws AlreadyAliveException
- * if a topology with this name is already running
+ * if a topology with this name is already running
* @throws InvalidTopologyException
- * if an invalid topology was submitted
+ * if an invalid topology was submitted
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
- public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology, final SubmitOptions opts, final FlinkProgressListener progressListener)
- throws AlreadyAliveException, InvalidTopologyException {
- if(!Utils.isValidConf(stormConf)) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ if (!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
-
+
final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
- if(!stormConf.containsKey(Config.NIMBUS_HOST)) {
+ if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
stormConf.put(Config.NIMBUS_HOST,
- flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+ flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
}
- if(!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
+ if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
stormConf.put(Config.NIMBUS_THRIFT_PORT,
- new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123)));
+ flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123));
}
-
+
final String serConf = JSONValue.toJSONString(stormConf);
-
+
final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
- if(client.getTopologyJobId(name) != null) {
+ if (client.getTopologyJobId(name) != null) {
throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
}
String localJar = System.getProperty("storm.jar");
- if(localJar == null) {
+ if (localJar == null) {
try {
- for(final File file : ((ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment()).getJars()) {
- // should only be one jar file...
- // TODO verify above assumption
+ for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+ .getJars()) {
+ // TODO verify that there is onnly one jar
localJar = file.getAbsolutePath();
}
- } catch(final ClassCastException e) {
+ } catch (final ClassCastException e) {
// ignore
}
}
try {
logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
- client.submitTopologyWithOpts(name, localJar, serConf, topology, opts);
- } catch(final InvalidTopologyException e) {
+ client.submitTopologyWithOpts(name, localJar, topology);
+ } catch (final InvalidTopologyException e) {
logger.warn("Topology submission exception: " + e.get_msg());
throw e;
- } catch(final AlreadyAliveException e) {
+ } catch (final AlreadyAliveException e) {
logger.warn("Topology already alive exception", e);
throw e;
} finally {
client.close();
}
-
+
logger.info("Finished submitting topology: " + name);
}
-
- /**
- * Same as {@link #submitTopology(String, Map, FlinkTopology)}. Progress bars are not supported by Flink.
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- */
-
- public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf, final FlinkTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopologyWithProgressBar(name, stormConf, topology, null);
- }
-
+
/**
* Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
* Flink.
- *
+ *
* @param name
- * the name of the storm.
+ * the name of the storm.
* @param stormConf
- * the topology-specific configuration. See {@link Config}.
+ * the topology-specific configuration. See {@link Config}.
* @param topology
- * the processing to execute.
+ * the processing to execute.
* @param opts
- * to manipulate the starting of the topology
+ * to manipulate the starting of the topology
* @throws AlreadyAliveException
- * if a topology with this name is already running
+ * if a topology with this name is already running
* @throws InvalidTopologyException
- * if an invalid topology was submitted
+ * if an invalid topology was submitted
*/
-
- public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf, final FlinkTopology topology, final SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopology(name, stormConf, topology, opts, null);
+ @SuppressWarnings("unused")
+ public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
+ final FlinkTopology topology)
+ throws AlreadyAliveException, InvalidTopologyException {
+ submitTopology(name, stormConf, topology);
}
-
+
/**
* In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
* returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
* environment.
- *
+ *
* @param conf
- * the topology-specific configuration. See {@link Config}.
+ * the topology-specific configuration. See {@link Config}.
* @param localJar
- * file path of the jar file to submit
+ * file path of the jar file to submit
* @return the value of parameter localJar
*/
- public static String submitJar(@SuppressWarnings("rawtypes") final Map conf, final String localJar) {
- return submitJar(conf, localJar, (FlinkProgressListener)null);
+ @SuppressWarnings({"rawtypes", "unused"})
+ public static String submitJar(final Map conf, final String localJar) {
+ return submitJar(localJar);
}
-
+
/**
* In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
* returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
* environment.
- *
+ *
* @param conf
- * the topology-specific configuration. See {@link Config}.
+ * the topology-specific configuration. See {@link Config}.
* @param localJar
- * file path of the jar file to submit
+ * file path of the jar file to submit
* @param listener
- * progress listener to track the jar file upload
+ * progress listener to track the jar file upload
* @return the value of parameter localJar
*/
- public static String submitJar(@SuppressWarnings("rawtypes") final Map conf, final String localJar, final FlinkProgressListener listener) {
- if(localJar == null) {
+ @SuppressWarnings("rawtypes")
+ public static String submitJar(final String localJar) {
+ if (localJar == null) {
throw new RuntimeException(
- "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+ "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
+ "to upload");
}
-
+
return localJar;
}
-
+
/**
* Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
*/
- public interface FlinkProgressListener { /* empty */}
-
+ public interface FlinkProgressListener {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
index 5f4340d..ae0730e 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
@@ -15,98 +15,84 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.stormcompatibility.api;
+import backtype.storm.generated.StormTopology;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import backtype.storm.generated.StormTopology;
-
-
-
-
-
/**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a
- * {@link StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a
- * {@link FlinkTopology} cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster},
- * {@link FlinkSubmitter}, or {@link FlinkClient}.
+ * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
+ * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
+ * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
+ * {@link FlinkClient}.
*/
class FlinkTopology extends StreamExecutionEnvironment {
- /**
- * The corresponding {@link StormTopology} that is mimiced by this {@link FlinkTopology}.
- */
+
+ // The corresponding {@link StormTopology} that is mimiced by this {@link FlinkTopology}
private final StormTopology stormTopology;
- /**
- * The number of declared tasks for the whole program (ie, sum over all dops)
- */
+ // The number of declared tasks for the whole program (ie, sum over all dops)
private int numberOfTasks = 0;
-
-
-
- /**
- * Instantiates a new {@link FlinkTestTopology}.
- */
+
public FlinkTopology(final StormTopology stormTopology) {
- // set default parallelism to 1, to mirror Storm default behavior
+ // Set default parallelism to 1, to mirror Storm default behavior
super.setParallelism(1);
this.stormTopology = stormTopology;
}
-
-
-
+
/**
- * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
- *
+ * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
+ * FlinkClient}.
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
public JobExecutionResult execute() throws Exception {
throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient instead.");
+ "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+ "instead.");
}
-
+
/**
- * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or
- * {@link FlinkClient}.
- *
+ * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
+ * FlinkClient}.
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
public JobExecutionResult execute(final String jobName) throws Exception {
throw new UnsupportedOperationException(
- "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient instead.");
+ "A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+ "instead.");
}
-
- /**
- * TODO
- */
+
+ //TODO
+ @SuppressWarnings("unused")
public String getStormTopologyAsString() {
return this.stormTopology.toString();
}
-
+
/**
* Increased the number of declared tasks of this program by the given value.
- *
+ *
* @param dop
- * The dop of a new operator that increases the number of overall tasks.
+ * The dop of a new operator that increases the number of overall tasks.
*/
public void increaseNumberOfTasks(final int dop) {
assert (dop > 0);
this.numberOfTasks += dop;
}
-
-
+
/**
* Return the number or required tasks to execute this program.
- *
+ *
* @return the number or required tasks to execute this program
*/
public int getNumberOfTasks() {
return this.numberOfTasks;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index e8f3702..0f09351 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -15,21 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+package org.apache.flink.stormcompatibility.api;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.GlobalStreamId;
@@ -43,10 +30,19 @@ import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.IRichStateSpout;
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-
-
-
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
/**
* {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
@@ -57,23 +53,14 @@ import backtype.storm.topology.TopologyBuilder;
* supported.</strong>
*/
public class FlinkTopologyBuilder {
- /**
- * A Storm {@link TopologyBuilder} to build a real Storm topology.
- */
+
+ // A Storm {@link TopologyBuilder} to build a real Storm topology
private final TopologyBuilder stormBuilder = new TopologyBuilder();
- /**
- * All user spouts by their ID.
- */
+ // All user spouts by their ID
private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
- /**
- * All user bolts by their ID.
- */
+ // All user bolts by their ID
private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
-
- // TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
-
-
-
+
/**
* Creates a Flink program that used the specified spouts and bolts.
*/
@@ -82,150 +69,150 @@ public class FlinkTopologyBuilder {
final StormTopology stormTopolgoy = this.stormBuilder.createTopology();
final FlinkTopology env = new FlinkTopology(stormTopolgoy);
env.setParallelism(1);
-
- final HashMap<String, SingleOutputStreamOperator> availableOperators = new HashMap<String, SingleOutputStreamOperator>();
-
- for(final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
+
+ final HashMap<String, SingleOutputStreamOperator> availableOperators =
+ new HashMap<String, SingleOutputStreamOperator>();
+
+ for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
final String spoutId = spout.getKey();
final IRichSpout userSpout = spout.getValue();
-
+
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userSpout.declareOutputFields(declarer);
-
- // TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
- // and StormCollector)
- // -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
- // the streams
+
+ /* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
+ * and StormCollector)
+ * -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
+ * the streams
+ */
final DataStreamSource source = env.addSource(new StormSpoutWrapper(userSpout), declarer.getOutputType());
availableOperators.put(spoutId, source);
-
+
int dop = 1;
final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
- if(common.is_set_parallelism_hint()) {
+ if (common.is_set_parallelism_hint()) {
dop = common.get_parallelism_hint();
source.setParallelism(dop);
}
env.increaseNumberOfTasks(dop);
}
-
-
-
+
final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
unprocessedBolts.putAll(this.bolts);
-
- final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
-
- // because we do not know the order in which an iterator steps over a set, we might process a consumer before
- // its producer
- // -> thus, we might need to repeat multiple times
- while(unprocessedBolts.size() > 0) {
-
+
+ final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
+ new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
+
+ /* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
+ * its producer
+ * ->thus, we might need to repeat multiple times
+ */
+ while (unprocessedBolts.size() > 0) {
+
final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
- while(boltsIterator.hasNext()) {
-
+ while (boltsIterator.hasNext()) {
+
final Entry<String, IRichBolt> bolt = boltsIterator.next();
final String boltId = bolt.getKey();
final IRichBolt userBolt = bolt.getValue();
-
+
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
userBolt.declareOutputFields(declarer);
-
+
final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
-
+
Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
- if(unprocessedInputs == null) {
+ if (unprocessedInputs == null) {
unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
unprocessedInputs.addAll(common.get_inputs().entrySet());
unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
}
-
+
// connect each available producer to the current bolt
final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
- while(inputStreamsIterator.hasNext()) {
-
+ while (inputStreamsIterator.hasNext()) {
+
final Entry<GlobalStreamId, Grouping> inputStream = inputStreamsIterator.next();
final String producerId = inputStream.getKey().get_componentId();
-
+
DataStream<?> inputDataStream = availableOperators.get(producerId);
-
- if(inputDataStream != null) { // if producer was processed already
+
+ if (inputDataStream != null) {
+ // if producer was processed already
final Grouping grouping = inputStream.getValue();
- if(grouping.is_set_shuffle()) {
+ if (grouping.is_set_shuffle()) {
// Storm uses a round-robin shuffle strategy
inputDataStream = inputDataStream.distribute();
- } else if(grouping.is_set_fields()) {
+ } else if (grouping.is_set_fields()) {
// global grouping is emulated in Storm via an empty fields grouping list
final List<String> fields = grouping.get_fields();
- if(fields.size() > 0) {
+ if (fields.size() > 0) {
inputDataStream = inputDataStream.groupBy(declarer.getGroupingFieldIndexes(grouping
- .get_fields()));
+ .get_fields()));
} else {
inputDataStream = inputDataStream.global();
}
- } else if(grouping.is_set_all()) {
+ } else if (grouping.is_set_all()) {
inputDataStream = inputDataStream.broadcast();
- } else if(grouping.is_set_local_or_shuffle()) {
- // nothing to do
- } else {
+ } else if (!grouping.is_set_local_or_shuffle()) {
throw new UnsupportedOperationException(
- "Flink only supports (local-or-)shuffle, fields, all, and global grouping");
+ "Flink only supports (local-or-)shuffle, fields, all, and global grouping");
}
-
+
final TypeInformation<?> outType = declarer.getOutputType();
-
- @SuppressWarnings("null")
+
final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
- new StormBoltWrapper(userBolt));
- if(outType != null) { // only for non-sink nodes
+ new StormBoltWrapper(userBolt));
+ if (outType != null) {
+ // only for non-sink nodes
availableOperators.put(boltId, operator);
}
-
+
int dop = 1;
- if(common.is_set_parallelism_hint()) {
+ if (common.is_set_parallelism_hint()) {
dop = common.get_parallelism_hint();
operator.setParallelism(dop);
}
env.increaseNumberOfTasks(dop);
-
+
inputStreamsIterator.remove();
}
}
-
- if(unprocessedInputs.size() == 0) { // all inputs are connected; processing bolt completed
+
+ if (unprocessedInputs.size() == 0) {
+ // all inputs are connected; processing bolt completed
boltsIterator.remove();
}
}
}
return env;
}
-
+
/**
* Define a new bolt in this topology with parallelism of just one thread.
- *
+ *
* @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
* @param bolt
- * the bolt
- *
+ * the bolt
* @return use the returned object to declare the inputs to this component
*/
public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
return this.setBolt(id, bolt, null);
}
-
+
/**
* Define a new bolt in this topology with the specified amount of parallelism.
- *
+ *
* @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
* @param bolt
- * the bolt
+ * the bolt
* @param parallelism_hint
- * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
- * process somewhere around the cluster.
- *
+ * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+ * process somewhere around the cluster.
* @return use the returned object to declare the inputs to this component
*/
public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
@@ -233,82 +220,85 @@ public class FlinkTopologyBuilder {
this.bolts.put(id, bolt);
return declarer;
}
-
+
/**
- * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind
+ * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+ * kind
* of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
* achieve proper reliability in the topology.
- *
+ *
* @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
* @param bolt
- * the basic bolt
- *
+ * the basic bolt
* @return use the returned object to declare the inputs to this component
*/
+ @SuppressWarnings("unused")
public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
return this.setBolt(id, bolt, null);
}
-
+
/**
- * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind
+ * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+ * kind
* of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
* achieve proper reliability in the topology.
- *
+ *
* @param id
- * the id of this component. This id is referenced by other components that want to consume this bolt's
- * outputs.
+ * the id of this component. This id is referenced by other components that want to consume this bolt's
+ * outputs.
* @param bolt
- * the basic bolt
+ * the basic bolt
* @param parallelism_hint
- * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
- * process somwehere around the cluster.
- *
+ * the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+ * process somwehere around the cluster.
* @return use the returned object to declare the inputs to this component
*/
public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
}
-
+
/**
* Define a new spout in this topology.
- *
+ *
* @param id
- * the id of this component. This id is referenced by other components that want to consume this spout's
- * outputs.
+ * the id of this component. This id is referenced by other components that want to consume this spout's
+ * outputs.
* @param spout
- * the spout
+ * the spout
*/
public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
return this.setSpout(id, spout, null);
}
-
+
/**
* Define a new spout in this topology with the specified parallelism. If the spout declares itself as
* non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
- *
+ *
* @param id
- * the id of this component. This id is referenced by other components that want to consume this spout's
- * outputs.
+ * the id of this component. This id is referenced by other components that want to consume this spout's
+ * outputs.
* @param parallelism_hint
- * the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
- * process somwehere around the cluster.
+ * the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
+ * process somwehere around the cluster.
* @param spout
- * the spout
+ * the spout
*/
public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
this.spouts.put(id, spout);
return declarer;
}
-
- // not implemented by Storm 0.9.4
- // public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
- // this.stormBuilder.setStateSpout(id, stateSpout);
- // }
- // public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
- // this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
- // }
-
+
+ // TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
+ /* not implemented by Storm 0.9.4
+ * public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
+ * this.stormBuilder.setStateSpout(id, stateSpout);
+ * }
+ * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
+ * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
+ * }
+ */
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
index 890f695..a761617 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
@@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.api;
-import java.util.Collection;
-import java.util.Map;
+package org.apache.flink.stormcompatibility.api;
import backtype.storm.generated.StormTopology;
import backtype.storm.hooks.ITaskHook;
@@ -29,137 +27,135 @@ import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.state.ISubscribedState;
import backtype.storm.task.TopologyContext;
-
-
-
+import java.util.Collection;
+import java.util.Map;
/**
* {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
* a Storm topology is executed within Flink.
*/
public class FlinkTopologyContext extends TopologyContext {
-
-
-
+
/**
* Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated
* for each parallel task
- *
+ *
* @param topology
- * The Storm topology that is currently executed
+ * The Storm topology that is currently executed
* @param taskToComponents
- * A map from task IDs to Component IDs
+ * A map from task IDs to Component IDs
* @param taskId
- * The ID of the task the context belongs to.
+ * The ID of the task the context belongs to.
*/
public FlinkTopologyContext(final StormTopology topology, final Map<Integer, String> taskToComponents,
- final Integer taskId) {
+ final Integer taskId) {
super(topology, null, taskToComponents, null, null, null, null, null, taskId, null, null, null, null, null,
- null, null);
+ null, null);
}
-
-
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
public void addTaskHook(final ITaskHook hook) {
- throw new UnsupportedOperationException("Task hooks are not supported by Flink.");
+ throw new UnsupportedOperationException("Task hooks are not supported by Flink");
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
public Collection<ITaskHook> getHooks() {
- throw new UnsupportedOperationException("Task hooks are not supported by Flink.");
+ throw new UnsupportedOperationException("Task hooks are not supported by Flink");
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
public IMetric getRegisteredMetricByName(final String name) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink.");
-
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
+
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
+ @SuppressWarnings("rawtypes")
@Override
- public CombinedMetric registerMetric(final String name, @SuppressWarnings("rawtypes") final ICombiner combiner, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink.");
+ public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) {
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
+ @SuppressWarnings("rawtypes")
@Override
- public ReducedMetric registerMetric(final String name, @SuppressWarnings("rawtypes") final IReducer combiner, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink.");
+ public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) {
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@SuppressWarnings("unchecked")
@Override
public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink.");
+ throw new UnsupportedOperationException("Metrics are not supported by Flink");
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
public <T extends ISubscribedState> T setAllSubscribedState(final T obj) {
- throw new UnsupportedOperationException("Not supported by Flink.");
-
+ throw new UnsupportedOperationException("Not supported by Flink");
+
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) {
- throw new UnsupportedOperationException("Not supported by Flink.");
+ throw new UnsupportedOperationException("Not supported by Flink");
}
-
+
/**
* Not supported by Flink.
- *
+ *
* @throws UnsupportedOperationException
- * at every invocation
+ * at every invocation
*/
@Override
- public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T obj) {
- throw new UnsupportedOperationException("Not supported by Flink.");
+ public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T
+ obj) {
+ throw new UnsupportedOperationException("Not supported by Flink");
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index f265d4c..5bc4635 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -14,21 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.stormcompatibility.wrappers;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.topology.IRichSpout;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.Collector;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.topology.IRichSpout;
-
-
-
-
-
/**
* A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
* Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
@@ -40,93 +36,83 @@ import backtype.storm.topology.IRichSpout;
*/
public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
private static final long serialVersionUID = 4993283609095408765L;
-
- /**
- * The wrapped Storm {@link IRichSpout spout}.
- */
+
+ // The wrapped Storm {@link IRichSpout spout}
protected final IRichSpout spout;
- /**
- * Number of attributes of the bolt's output tuples.
- */
+ // Number of attributes of the bolt's output tuples
private final int numberOfAttributes;
- /**
- * The wrapper of the given Flink collector.
- */
+ // The wrapper of the given Flink collector
protected StormCollector<OUT> collector;
- /**
- * Indicates, if the source is still running or was canceled;
- */
+ // Indicates, if the source is still running or was canceled
protected boolean isRunning = true;
-
-
-
+
/**
* Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
* that it can be used within a Flink streaming program. The output type will be one of {@link Tuple1} to
* {@link Tuple25} depending on the spout's declared number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [1;25].
+ * If the number of declared output attributes is not with range [1;25].
*/
+ @SuppressWarnings("unused")
public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
this(spout, false);
}
-
+
/**
* Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
* that it can be used within a Flink streaming program. The output type can be any type if parameter
* {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
* {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
* number of attributes.
- *
+ *
* @param spout
- * The Storm {@link IRichSpout spout} to be used.
+ * The Storm {@link IRichSpout spout} to be used.
* @param rawOutput
- * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
+ * Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+ * of a raw type.
* @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
+ * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+ * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+ * [1;25].
*/
public AbstractStormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
this.spout = spout;
this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutput);
}
-
-
-
+
@Override
- public final void run(@SuppressWarnings("hiding") final Collector<OUT> collector) throws Exception {
+ public final void run(final Collector<OUT> collector) throws Exception {
this.collector = new StormCollector<OUT>(this.numberOfAttributes, collector);
this.spout.open(null,
- StormWrapperSetupHelper.convertToTopologyContext((StreamingRuntimeContext)super.getRuntimeContext(), true),
- new SpoutOutputCollector(this.collector));
+ StormWrapperSetupHelper
+ .convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
+ new SpoutOutputCollector(this.collector));
this.spout.activate();
this.execute();
}
-
+
/**
* Needs to be implemented to call the given Spout's {@link IRichSpout#nextTuple() nextTuple()} method. This method
* might use a {@code while(true)}-loop to emit an infinite number of tuples.
*/
protected abstract void execute();
-
+
/**
* {@inheritDoc}
- *
+ * <p/>
* Sets the {@link #isRunning} flag to {@code false}.
*/
@Override
public void cancel() {
this.isRunning = false;
}
-
+
@Override
public void close() throws Exception {
this.spout.close();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
index e09f250..3dbc451 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
@@ -14,17 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.stormcompatibility.wrappers;
-import java.io.Serializable;
+package org.apache.flink.stormcompatibility.wrappers;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
-
-
-
+import java.io.Serializable;
/**
* {@link FlinkDummyRichFunction} has the only purpose to retrieve the {@link RuntimeContext} for
@@ -32,28 +29,24 @@ import org.apache.flink.configuration.Configuration;
*/
class FlinkDummyRichFunction implements RichFunction, Serializable {
private static final long serialVersionUID = 7992273349877302520L;
-
- /**
- * The runtime context of a Storm bolt.
- */
+
+ // The runtime context of a Storm bolt
private RuntimeContext context;
-
-
-
+
@Override
public void open(final Configuration parameters) throws Exception {/* nothing to do */}
-
+
@Override
public void close() throws Exception {/* nothing to do */}
-
+
@Override
public RuntimeContext getRuntimeContext() {
return this.context;
}
-
+
@Override
public void setRuntimeContext(final RuntimeContext t) {
this.context = t;
}
-
+
}