You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:02 UTC

[12/27] flink git commit: [storm-compat] Storm compatibility code cleanup

[storm-compat] Storm compatibility code cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e497a831
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e497a831
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e497a831

Branch: refs/heads/master
Commit: e497a831fab3c01ff8dc5992940b09e6427fa472
Parents: 9ff3cf0
Author: szape <ne...@gmail.com>
Authored: Thu May 21 10:15:20 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sun Jun 14 23:00:00 2015 +0200

----------------------------------------------------------------------
 .../stormcompatibility/api/FlinkClient.java     | 269 +++++++++----------
 .../api/FlinkLocalCluster.java                  |  87 +++---
 .../api/FlinkOutputFieldsDeclarer.java          | 115 ++++----
 .../stormcompatibility/api/FlinkSubmitter.java  | 216 ++++++---------
 .../stormcompatibility/api/FlinkTopology.java   |  82 +++---
 .../api/FlinkTopologyBuilder.java               | 262 +++++++++---------
 .../api/FlinkTopologyContext.java               | 114 ++++----
 .../wrappers/AbstractStormSpoutWrapper.java     |  76 +++---
 .../wrappers/FlinkDummyRichFunction.java        |  25 +-
 .../wrappers/StormBoltWrapper.java              |  87 +++---
 .../wrappers/StormCollector.java                | 123 ++++-----
 .../wrappers/StormFiniteSpoutWrapper.java       |  91 +++----
 .../wrappers/StormOutputFieldsDeclarer.java     |  37 ++-
 .../wrappers/StormSpoutWrapper.java             |  47 ++--
 .../stormcompatibility/wrappers/StormTuple.java | 127 +++++----
 .../wrappers/StormWrapperSetupHelper.java       |  95 ++++---
 .../api/FlinkOutputFieldsDeclarerTest.java      | 111 ++++----
 .../api/FlinkTopologyContextTest.java           |  34 ++-
 .../api/FlinkTopologyTest.java                  |  25 +-
 .../stormcompatibility/util/AbstractTest.java   |  13 +-
 .../wrappers/FiniteTestSpout.java               |  43 ++-
 .../wrappers/FlinkDummyRichFunctionTest.java    |  15 +-
 .../wrappers/StormBoltWrapperTest.java          | 127 ++++-----
 .../wrappers/StormCollectorTest.java            |  79 +++---
 .../wrappers/StormFiniteSpoutWrapperTest.java   |  64 +++--
 .../wrappers/StormOutputFieldsDeclarerTest.java |  38 ++-
 .../wrappers/StormSpoutWrapperTest.java         |  30 +--
 .../wrappers/StormTupleTest.java                | 190 +++++++------
 .../wrappers/StormWrapperSetupHelperTest.java   |  83 +++---
 .../wrappers/TestCollector.java                 |  17 +-
 .../src/assembly/word-count-storm.xml           |  94 +++----
 .../util/AbstractStormBoltSink.java             |  83 ++++++
 .../util/AbstractStormSpout.java                |  70 +++++
 .../util/OutputFormatter.java                   |  25 ++
 .../util/StormBoltFileSink.java                 |  72 +++++
 .../util/StormBoltPrintSink.java                |  45 ++++
 .../wordcount/BoltTokenizerWordCount.java       |  79 +++---
 .../wordcount/SpoutSourceWordCount.java         |  96 ++++---
 .../wordcount/StormWordCountLocal.java          |  43 ++-
 .../wordcount/StormWordCountRemoteByClient.java |  55 ++--
 .../StormWordCountRemoteBySubmitter.java        |  58 ++--
 .../wordcount/WordCountTopology.java            |  76 +++---
 .../stormoperators/AbstractStormBoltSink.java   |  80 ------
 .../stormoperators/AbstractStormSpout.java      |  75 ------
 .../stormoperators/StormBoltCounter.java        |  49 ++--
 .../stormoperators/StormBoltFileSink.java       |  77 ------
 .../stormoperators/StormBoltPrintSink.java      |  43 ---
 .../stormoperators/StormBoltTokenizer.java      |  38 ++-
 .../stormoperators/StormFileSpout.java          |  43 ++-
 .../stormoperators/StormInMemorySpout.java      |  19 +-
 .../WordCountOutputFormatter.java               |  41 +++
 .../api/FlinkTestCluster.java                   |  63 ++---
 .../wordcount/BoltTokenizerWordCountITCase.java |  18 +-
 .../wordcount/SpoutSourceWordCountITCase.java   |  18 +-
 .../wordcount/StormWordCountLocalITCase.java    |  18 +-
 55 files changed, 1919 insertions(+), 2181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
index 4efe02f..0f11d63 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
@@ -15,16 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+package org.apache.flink.stormcompatibility.api;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
+import backtype.storm.Config;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.KillOptions;
+import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NotAliveException;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.JobWithJars;
@@ -40,282 +45,258 @@ import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-
 import scala.Some;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-
-
 
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
  * Flink's JobManager instead of Storm's Nimbus.
  */
 public class FlinkClient {
-	/**
-	 * The jobmanager's host name.
-	 */
+
+	//The jobmanager's host name
 	private final String jobManagerHost;
-	/**
-	 * The jobmanager's rpc port.
-	 */
+	//The jobmanager's rpc port
 	private final int jobManagerPort;
-	/**
-	 * The user specified timeout in milliseconds.
-	 */
+	//The user specified timeout in milliseconds
 	private final String timeout;
-	
-	
-	
-	/*
-	 * The following methods are derived from "backtype.storm.utils.NimbusClient"
-	 */
-	
+
+	// The following methods are derived from "backtype.storm.utils.NimbusClient"
+
 	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for
-	 * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 * 
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 *
 	 * @param conf
-	 *            A configuration.
+	 * 		A configuration.
 	 * @param host
-	 *            The jobmanager's host name.
+	 * 		The jobmanager's host name.
 	 * @param port
-	 *            The jobmanager's rpc port.
+	 * 		The jobmanager's rpc port.
 	 */
-	public FlinkClient(final Map<?, ?> conf, final String host, final int port) {
-		this(conf, host, port, null);
+	public FlinkClient(final String host, final int port) {
+		this(host, port, null);
 	}
-	
+
 	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for
-	 * {@link Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 * 
+	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
+	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
+	 *
 	 * @param conf
-	 *            A configuration.
+	 * 		A configuration.
 	 * @param host
-	 *            The jobmanager's host name.
+	 * 		The jobmanager's host name.
 	 * @param port
-	 *            The jobmanager's rpc port.
+	 * 		The jobmanager's rpc port.
 	 * @param timeout
+	 * 		Timeout
 	 */
-	public FlinkClient(final Map<?, ?> conf, final String host, final int port, final Integer timeout) {
+	public FlinkClient(final String host, final int port, final Integer timeout) {
 		this.jobManagerHost = host;
 		this.jobManagerPort = port;
-		if(timeout != null) {
+		if (timeout != null) {
 			this.timeout = timeout + " ms";
 		} else {
 			this.timeout = null;
 		}
 	}
-	
-	
-	
+
 	/**
-	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and
-	 * {@link Config#NIMBUS_THRIFT_PORT} as JobManager address.
-	 * 
+	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
+	 * Config#NIMBUS_THRIFT_PORT} as JobManager address.
+	 *
 	 * @param conf
-	 *            Configuration that contains the jobmanager's hostname and port.
-	 * 
+	 * 		Configuration that contains the jobmanager's hostname and port.
 	 * @return A configured {@link FlinkClient}.
 	 */
-	public static FlinkClient getConfiguredClient(@SuppressWarnings("rawtypes") final Map conf) {
-		final String nimbusHost = (String)conf.get(Config.NIMBUS_HOST);
-		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
-		return new FlinkClient(conf, nimbusHost, nimbusPort);
+	@SuppressWarnings("rawtypes")
+	public static FlinkClient getConfiguredClient(final Map conf) {
+		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
+		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
+		return new FlinkClient(nimbusHost, nimbusPort);
 	}
-	
-	
-	
+
 	/**
 	 * Return a reference to itself.
-	 * 
+	 * <p/>
 	 * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
-	 * 
+	 *
 	 * @return A reference to itself.
 	 */
+	@SuppressWarnings("unused")
 	public FlinkClient getClient() {
 		return this;
 	}
-	
+
 	public void close() {/* nothing to do */}
-	
-	
-	/*
-	 * The following methods are derived from "backtype.storm.generated.Nimubs.Client"
-	 */
-	
+
+	// The following methods are derived from "backtype.storm.generated.Nimubs.Client"
+
 	/**
 	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
 	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
 	 */
-	public void submitTopology(final String name, final String uploadedJarLocation, final String jsonConf, final FlinkTopology topology)
-		throws AlreadyAliveException, InvalidTopologyException {
-		this.submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, null);
+	public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
 	}
-	
+
 	/**
 	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
 	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
 	 */
-	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final String jsonConf, final FlinkTopology topology, final SubmitOptions options)
-		throws AlreadyAliveException, InvalidTopologyException {
-		
-		if(this.getTopologyJobId(name) != null) {
+	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
+			topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+
+		if (this.getTopologyJobId(name) != null) {
 			throw new AlreadyAliveException();
 		}
-		
+
 		final File uploadedJarFile = new File(uploadedJarLocation);
 		try {
 			JobWithJars.checkJarFile(uploadedJarFile);
-		} catch(final IOException e) {
+		} catch (final IOException e) {
 			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
 		}
-		
+
 		final List<File> jarFiles = new ArrayList<File>();
 		jarFiles.add(uploadedJarFile);
-		
+
 		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
 		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
-		
+
 		final Configuration configuration = jobGraph.getJobConfiguration();
-		
+
 		final Client client;
 		try {
 			client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
-				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
-		} catch(final UnknownHostException e) {
+					JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
+		} catch (final UnknownHostException e) {
 			throw new RuntimeException("Cannot execute job due to UnknownHostException", e);
 		}
-		
+
 		try {
 			client.run(jobGraph, false);
-		} catch(final ProgramInvocationException e) {
+		} catch (final ProgramInvocationException e) {
 			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
 		}
 	}
-	
+
 	public void killTopology(final String name) throws NotAliveException {
 		this.killTopologyWithOpts(name, null);
 	}
-	
+
 	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
 		final JobID jobId = this.getTopologyJobId(name);
-		if(jobId == null) {
+		if (jobId == null) {
 			throw new NotAliveException();
 		}
-		
+
 		try {
 			final ActorRef jobManager = this.getJobManager();
-			
-			if(options != null) {
+
+			if (options != null) {
 				try {
 					Thread.sleep(1000 * options.get_wait_secs());
-				} catch(final InterruptedException e) {
+				} catch (final InterruptedException e) {
 					throw new RuntimeException(e);
 				}
 			}
-			
+
 			final FiniteDuration askTimeout = this.getTimeout();
 			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
 			try {
 				Await.result(response, askTimeout);
-			} catch(final Exception e) {
-				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed.", e);
+			} catch (final Exception e) {
+				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
 			}
-		} catch(final IOException e) {
+		} catch (final IOException e) {
 			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-				+ ":" + this.jobManagerPort, e);
+					+ ":" + this.jobManagerPort, e);
 		}
 	}
-	
+
 	/**
 	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
-	 * 
+	 *
 	 * @param id
-	 *            The Storm topology name.
-	 * 
+	 * 		The Storm topology name.
 	 * @return Flink's internally used {@link JobID}.
 	 */
 	JobID getTopologyJobId(final String id) {
 		final Configuration configuration = GlobalConfiguration.getConfiguration();
-		if(this.timeout != null) {
+		if (this.timeout != null) {
 			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
 		}
-		
+
 		try {
 			final ActorRef jobManager = this.getJobManager();
-			
+
 			final FiniteDuration askTimeout = this.getTimeout();
 			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
-				new Timeout(askTimeout));
-			
+					new Timeout(askTimeout));
+
 			Object result;
 			try {
 				result = Await.result(response, askTimeout);
-			} catch(final Exception e) {
-				throw new RuntimeException("Could not retrieve running jobs from the JobManager.", e);
+			} catch (final Exception e) {
+				throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
 			}
-			
-			if(result instanceof RunningJobsStatus) {
-				final List<JobStatusMessage> jobs = ((RunningJobsStatus)result).getStatusMessages();
-				
-				for(final JobStatusMessage status : jobs) {
-					if(status.getJobName().equals(id)) {
+
+			if (result instanceof RunningJobsStatus) {
+				final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
+
+				for (final JobStatusMessage status : jobs) {
+					if (status.getJobName().equals(id)) {
 						return status.getJobId();
 					}
 				}
 			} else {
 				throw new RuntimeException("ReqeustRunningJobs requires a response of type "
-					+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
+						+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
 			}
-		} catch(final IOException e) {
+		} catch (final IOException e) {
 			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-				+ ":" + this.jobManagerPort, e);
+					+ ":" + this.jobManagerPort, e);
 		}
-		
+
 		return null;
 	}
-	
+
 	private FiniteDuration getTimeout() {
 		final Configuration configuration = GlobalConfiguration.getConfiguration();
-		if(this.timeout != null) {
+		if (this.timeout != null) {
 			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
 		}
-		
+
 		return AkkaUtils.getTimeout(configuration);
 	}
-	
+
 	private ActorRef getJobManager() throws IOException {
 		final Configuration configuration = GlobalConfiguration.getConfiguration();
-		
+
 		ActorSystem actorSystem;
 		try {
-			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", new Integer(0));
+			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
 			actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
-				systemEndpoint));
-		} catch(final Exception e) {
+					systemEndpoint));
+		} catch (final Exception e) {
 			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
 		}
-		
+
 		return JobManager.getJobManagerRemoteReference(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
-			actorSystem, AkkaUtils.getLookupTimeout(configuration));
+				actorSystem, AkkaUtils.getLookupTimeout(configuration));
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
index fb88570..7160bc4 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
@@ -15,11 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
-
-import java.util.Map;
 
-import org.apache.flink.streaming.util.ClusterUtil;
+package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.LocalCluster;
 import backtype.storm.generated.ClusterSummary;
@@ -28,106 +25,96 @@ import backtype.storm.generated.RebalanceOptions;
 import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.SubmitOptions;
 import backtype.storm.generated.TopologyInfo;
+import org.apache.flink.streaming.util.ClusterUtil;
 
-
-
-
+import java.util.Map;
 
 /**
  * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
  */
 public class FlinkLocalCluster {
-	
+
 	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
-		throws Exception {
+			throws Exception {
 		this.submitTopologyWithOpts(topologyName, conf, topology, null);
 	}
-	
-	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology, final SubmitOptions submitOpts)
-		throws Exception {
+
+	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
+			final SubmitOptions submitOpts) throws Exception {
 		ClusterUtil
-			.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
+				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
 	}
-	
+
 	public void killTopology(final String topologyName) {
 		this.killTopologyWithOpts(topologyName, null);
 	}
-	
+
 	public void killTopologyWithOpts(final String name, final KillOptions options) {
-		// TODO Auto-generated method stub
 	}
-	
+
 	public void activate(final String topologyName) {
-		// TODO Auto-generated method stub
 	}
-	
+
 	public void deactivate(final String topologyName) {
-		// TODO Auto-generated method stub
 	}
-	
+
 	public void rebalance(final String name, final RebalanceOptions options) {
-		// TODO Auto-generated method stub
 	}
-	
+
 	public void shutdown() {
 		ClusterUtil.stopOnMiniCluster();
 	}
-	
+
+	@SuppressWarnings("unused")
 	public String getTopologyConf(final String id) {
-		// TODO Auto-generated method stub
 		return null;
 	}
-	
+
+	@SuppressWarnings("unused")
 	public StormTopology getTopology(final String id) {
-		// TODO Auto-generated method stub
 		return null;
 	}
-	
+
+	@SuppressWarnings("unused")
 	public ClusterSummary getClusterInfo() {
-		// TODO Auto-generated method stub
 		return null;
 	}
-	
+
+	@SuppressWarnings("unused")
 	public TopologyInfo getTopologyInfo(final String id) {
-		// TODO Auto-generated method stub
 		return null;
 	}
-	
+
+	@SuppressWarnings("unused")
 	public Map<?, ?> getState() {
-		// TODO Auto-generated method stub
 		return null;
 	}
-	
-	
-	
-	// the following is used to set a different execution environment for ITCases
-	/**
-	 * A different {@link FlinkLocalCluster} to be used for execution.
-	 */
+
+	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
 	private static FlinkLocalCluster currentCluster = null;
-	
+
 	/**
-	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
-	 * {@link #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
-	 * 
+	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by {@link
+	 * #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
+	 *
 	 * @return a {@link FlinkLocalCluster} to be used for execution
 	 */
 	public static FlinkLocalCluster getLocalCluster() {
-		if(currentCluster == null) {
+		if (currentCluster == null) {
 			currentCluster = new FlinkLocalCluster();
 		}
-		
+
 		return currentCluster;
 	}
-	
+
 	/**
 	 * Sets a different {@link FlinkLocalCluster} to be used for execution.
-	 * 
+	 *
 	 * @param cluster
-	 *            the {@link FlinkLocalCluster} to be used for execution
+	 * 		the {@link FlinkLocalCluster} to be used for execution
 	 */
 	public static void initialize(final FlinkLocalCluster cluster) {
 		currentCluster = cluster;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
index d371fc0..206db28 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
@@ -14,158 +14,153 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
-
-import java.util.List;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
+package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 
-
-
-
+import java.util.List;
 
 /**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or
- * {@link IRichBolt bolt}.<br />
- * <br />
- * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore, direct emit is not
- * supported.</strong>
+ * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or {@link
+ * IRichBolt bolt}.<br /> <br /> <strong>CAUTION: Currently, Flink does only support the default output stream.
+ * Furthermore, direct emit is not supported.</strong>
  */
 final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
 	private Fields outputSchema;
-	
+
 	@Override
 	public void declare(final Fields fields) {
 		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
 	}
-	
+
 	/**
 	 * {@inheritDoc}
-	 * 
+	 * <p/>
 	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             if {@code direct} is {@code true}
+	 * 		if {@code direct} is {@code true}
 	 */
 	@Override
 	public void declare(final boolean direct, final Fields fields) {
 		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
 	}
-	
+
 	/**
 	 * {@inheritDoc}
-	 * 
+	 * <p/>
 	 * Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
 	 * {@link Utils#DEFAULT_STREAM_ID}.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
+	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
 	 */
 	@Override
 	public void declareStream(final String streamId, final Fields fields) {
 		this.declareStream(streamId, false, fields);
 	}
-	
+
 	/**
 	 * {@inheritDoc}
-	 * 
+	 * <p/>
 	 * Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
 	 * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
 	 * must be {@code false}.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
+	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
 	 */
 	@Override
 	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
+		if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
 			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
 		}
-		if(direct) {
+		if (direct) {
 			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 		}
-		
+
 		this.outputSchema = fields;
 	}
-	
+
 	/**
 	 * Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
 	 * {@code null} is returned.
-	 * 
+	 *
 	 * @return output type information for the declared output schema; or {@code null} if no output schema was declared
-	 * 
 	 * @throws IllegalArgumentException
-	 *             if more then 25 attributes are declared
+	 * 		if more then 25 attributes are declared
 	 */
 	public TypeInformation<?> getOutputType() throws IllegalArgumentException {
-		if((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
+		if ((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
 			return null;
 		}
-		
+
 		Tuple t;
 		final int numberOfAttributes = this.outputSchema.size();
-		
-		if(numberOfAttributes == 1) {
+
+		if (numberOfAttributes == 1) {
 			return TypeExtractor.getForClass(Object.class);
-		} else if(numberOfAttributes <= 25) {
+		} else if (numberOfAttributes <= 25) {
 			try {
 				t = Tuple.getTupleClass(numberOfAttributes).newInstance();
-			} catch(final InstantiationException e) {
+			} catch (final InstantiationException e) {
 				throw new RuntimeException(e);
-			} catch(final IllegalAccessException e) {
+			} catch (final IllegalAccessException e) {
 				throw new RuntimeException(e);
 			}
 		} else {
-			throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes.");
+			throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
 		}
-		
+
 		// TODO: declare only key fields as DefaultComparable
-		for(int i = 0; i < numberOfAttributes; ++i) {
+		for (int i = 0; i < numberOfAttributes; ++i) {
 			t.setField(new DefaultComparable(), i);
 		}
-		
+
 		return TypeExtractor.getForObject(t);
 	}
-	
+
 	/**
-	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct
-	 * {@link TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not
-	 * comparable, Flink cannot use them and will throw an exception.
-	 * 
+	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
+	 * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
+	 * Flink cannot use them and will throw an exception.
+	 *
 	 * @author mjsax
 	 */
 	private static class DefaultComparable implements Comparable<DefaultComparable> {
-		
-		public DefaultComparable() {}
-		
+
+		public DefaultComparable() {
+		}
+
+		@SuppressWarnings("NullableProblems")
 		@Override
 		public int compareTo(final DefaultComparable o) {
 			return 0;
 		}
 	}
-	
+
 	/**
 	 * Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
-	 * 
+	 *
 	 * @return array of {@code int}s that contains the index without the output schema for each attribute in the given
-	 *         list
+	 * list
 	 */
 	public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
 		final int[] fieldIndexes = new int[groupingFields.size()];
-		
-		for(int i = 0; i < fieldIndexes.length; ++i) {
+
+		for (int i = 0; i < fieldIndexes.length; ++i) {
 			fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
 		}
-		
+
 		return fieldIndexes;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
index efd0c46..1f37bf8 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
@@ -14,11 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
 
-import java.io.File;
-import java.util.Map;
+package org.apache.flink.stormcompatibility.api;
 
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.AlreadyAliveException;
+import backtype.storm.generated.InvalidTopologyException;
+import backtype.storm.generated.SubmitOptions;
+import backtype.storm.utils.Utils;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.configuration.ConfigConstants;
@@ -28,218 +32,176 @@ import org.json.simple.JSONValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.Utils;
-
-
-
-
+import java.io.File;
+import java.util.Map;
 
 /**
  * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
  */
 public class FlinkSubmitter {
 	public static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
-	
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
-	 * 
-	 * 
-	 * @param name
-	 *            the name of the storm.
-	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 *            the processing to execute.
-	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
-	 */
-	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology)
-		throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology, (SubmitOptions)null, (FlinkProgressListener)null);
-	}
-	
+
 	/**
 	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
-	 * 
+	 *
 	 * @param name
-	 *            the name of the storm.
+	 * 		the name of the storm.
 	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
+	 * 		the topology-specific configuration. See {@link Config}.
 	 * @param topology
-	 *            the processing to execute.
+	 * 		the processing to execute.
 	 * @param opts
-	 *            to manipulate the starting of the topology.
+	 * 		to manipulate the starting of the topology.
 	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
+	 * 		if a topology with this name is already running
 	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
+	 * 		if an invalid topology was submitted
 	 */
-	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology, final SubmitOptions opts)
-		throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology, opts, (FlinkProgressListener)null);
+	@SuppressWarnings("unused")
+	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
+			final SubmitOptions opts)
+			throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology);
 	}
-	
+
 	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given
-	 * {@link FlinkProgressListener} is ignored because progress bars are not supported by Flink.
-	 * 
-	 * 
+	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
+	 * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
+	 *
 	 * @param name
-	 *            the name of the storm.
+	 * 		the name of the storm.
 	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
+	 * 		the topology-specific configuration. See {@link Config}.
 	 * @param topology
-	 *            the processing to execute.
+	 * 		the processing to execute.
 	 * @param opts
-	 *            to manipulate the starting of the topology
+	 * 		to manipulate the starting of the topology
 	 * @param progressListener
-	 *            to track the progress of the jar upload process
+	 * 		to track the progress of the jar upload process
 	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
+	 * 		if a topology with this name is already running
 	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
+	 * 		if an invalid topology was submitted
 	 */
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology, final SubmitOptions opts, final FlinkProgressListener progressListener)
-		throws AlreadyAliveException, InvalidTopologyException {
-		if(!Utils.isValidConf(stormConf)) {
+	@SuppressWarnings({"rawtypes", "unchecked"})
+	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		if (!Utils.isValidConf(stormConf)) {
 			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
 		}
-		
+
 		final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
-		if(!stormConf.containsKey(Config.NIMBUS_HOST)) {
+		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
 			stormConf.put(Config.NIMBUS_HOST,
-				flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
+					flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
 		}
-		if(!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
+		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
 			stormConf.put(Config.NIMBUS_THRIFT_PORT,
-				new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123)));
+					flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123));
 		}
-		
+
 		final String serConf = JSONValue.toJSONString(stormConf);
-		
+
 		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
-		if(client.getTopologyJobId(name) != null) {
+		if (client.getTopologyJobId(name) != null) {
 			throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
 		}
 		String localJar = System.getProperty("storm.jar");
-		if(localJar == null) {
+		if (localJar == null) {
 			try {
-				for(final File file : ((ContextEnvironment)ExecutionEnvironment.getExecutionEnvironment()).getJars()) {
-					// should only be one jar file...
-					// TODO verify above assumption
+				for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
+						.getJars()) {
+					// TODO verify that there is onnly one jar
 					localJar = file.getAbsolutePath();
 				}
-			} catch(final ClassCastException e) {
+			} catch (final ClassCastException e) {
 				// ignore
 			}
 		}
 		try {
 			logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
-			client.submitTopologyWithOpts(name, localJar, serConf, topology, opts);
-		} catch(final InvalidTopologyException e) {
+			client.submitTopologyWithOpts(name, localJar, topology);
+		} catch (final InvalidTopologyException e) {
 			logger.warn("Topology submission exception: " + e.get_msg());
 			throw e;
-		} catch(final AlreadyAliveException e) {
+		} catch (final AlreadyAliveException e) {
 			logger.warn("Topology already alive exception", e);
 			throw e;
 		} finally {
 			client.close();
 		}
-		
+
 		logger.info("Finished submitting topology: " + name);
 	}
-	
-	/**
-	 * Same as {@link #submitTopology(String, Map, FlinkTopology)}. Progress bars are not supported by Flink.
-	 * 
-	 * @param name
-	 *            the name of the storm.
-	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 *            the processing to execute.
-	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
-	 */
-	
-	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf, final FlinkTopology topology)
-		throws AlreadyAliveException, InvalidTopologyException {
-		submitTopologyWithProgressBar(name, stormConf, topology, null);
-	}
-	
+
 	/**
 	 * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
 	 * Flink.
-	 * 
+	 *
 	 * @param name
-	 *            the name of the storm.
+	 * 		the name of the storm.
 	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
+	 * 		the topology-specific configuration. See {@link Config}.
 	 * @param topology
-	 *            the processing to execute.
+	 * 		the processing to execute.
 	 * @param opts
-	 *            to manipulate the starting of the topology
+	 * 		to manipulate the starting of the topology
 	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
+	 * 		if a topology with this name is already running
 	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
+	 * 		if an invalid topology was submitted
 	 */
-	
-	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf, final FlinkTopology topology, final SubmitOptions opts)
-		throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology, opts, null);
+	@SuppressWarnings("unused")
+	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
+			final FlinkTopology topology)
+			throws AlreadyAliveException, InvalidTopologyException {
+		submitTopology(name, stormConf, topology);
 	}
-	
+
 	/**
 	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
 	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
 	 * environment.
-	 * 
+	 *
 	 * @param conf
-	 *            the topology-specific configuration. See {@link Config}.
+	 * 		the topology-specific configuration. See {@link Config}.
 	 * @param localJar
-	 *            file path of the jar file to submit
+	 * 		file path of the jar file to submit
 	 * @return the value of parameter localJar
 	 */
-	public static String submitJar(@SuppressWarnings("rawtypes") final Map conf, final String localJar) {
-		return submitJar(conf, localJar, (FlinkProgressListener)null);
+	@SuppressWarnings({"rawtypes", "unused"})
+	public static String submitJar(final Map conf, final String localJar) {
+		return submitJar(localJar);
 	}
-	
+
 	/**
 	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
 	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
 	 * environment.
-	 * 
+	 *
 	 * @param conf
-	 *            the topology-specific configuration. See {@link Config}.
+	 * 		the topology-specific configuration. See {@link Config}.
 	 * @param localJar
-	 *            file path of the jar file to submit
+	 * 		file path of the jar file to submit
 	 * @param listener
-	 *            progress listener to track the jar file upload
+	 * 		progress listener to track the jar file upload
 	 * @return the value of parameter localJar
 	 */
-	public static String submitJar(@SuppressWarnings("rawtypes") final Map conf, final String localJar, final FlinkProgressListener listener) {
-		if(localJar == null) {
+	@SuppressWarnings("rawtypes")
+	public static String submitJar(final String localJar) {
+		if (localJar == null) {
 			throw new RuntimeException(
-				"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
+					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
+							"to upload");
 		}
-		
+
 		return localJar;
 	}
-	
+
 	/**
 	 * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
 	 */
-	public interface FlinkProgressListener { /* empty */}
-	
+	public interface FlinkProgressListener {
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
index 5f4340d..ae0730e 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
@@ -15,98 +15,84 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.stormcompatibility.api;
 
+import backtype.storm.generated.StormTopology;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
-import backtype.storm.generated.StormTopology;
-
-
-
-
-
 /**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a
- * {@link StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a
- * {@link FlinkTopology} cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster},
- * {@link FlinkSubmitter}, or {@link FlinkClient}.
+ * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
+ * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
+ * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
+ * {@link FlinkClient}.
  */
 class FlinkTopology extends StreamExecutionEnvironment {
-	/**
-	 * The corresponding {@link StormTopology} that is mimiced by this {@link FlinkTopology}.
-	 */
+
+	// The corresponding {@link StormTopology} that is mimiced by this {@link FlinkTopology}
 	private final StormTopology stormTopology;
-	/**
-	 * The number of declared tasks for the whole program (ie, sum over all dops)
-	 */
+	// The number of declared tasks for the whole program (ie, sum over all dops)
 	private int numberOfTasks = 0;
-	
-	
-	
-	/**
-	 * Instantiates a new {@link FlinkTestTopology}.
-	 */
+
 	public FlinkTopology(final StormTopology stormTopology) {
-		// set default parallelism to 1, to mirror Storm default behavior
+		// Set default parallelism to 1, to mirror Storm default behavior
 		super.setParallelism(1);
 		this.stormTopology = stormTopology;
 	}
-	
-	
-	
+
 	/**
-	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
-	 * {@link FlinkClient}.
-	 * 
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
+	 * FlinkClient}.
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
 	public JobExecutionResult execute() throws Exception {
 		throw new UnsupportedOperationException(
-			"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient instead.");
+				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+						"instead.");
 	}
-	
+
 	/**
-	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or
-	 * {@link FlinkClient}.
-	 * 
+	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
+	 * FlinkClient}.
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
 	public JobExecutionResult execute(final String jobName) throws Exception {
 		throw new UnsupportedOperationException(
-			"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient instead.");
+				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
+						"instead.");
 	}
-	
-	/**
-	 * TODO
-	 */
+
+	//TODO
+	@SuppressWarnings("unused")
 	public String getStormTopologyAsString() {
 		return this.stormTopology.toString();
 	}
-	
+
 	/**
 	 * Increased the number of declared tasks of this program by the given value.
-	 * 
+	 *
 	 * @param dop
-	 *            The dop of a new operator that increases the number of overall tasks.
+	 * 		The dop of a new operator that increases the number of overall tasks.
 	 */
 	public void increaseNumberOfTasks(final int dop) {
 		assert (dop > 0);
 		this.numberOfTasks += dop;
 	}
-	
-	
+
 	/**
 	 * Return the number or required tasks to execute this program.
-	 * 
+	 *
 	 * @return the number or required tasks to execute this program
 	 */
 	public int getNumberOfTasks() {
 		return this.numberOfTasks;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
index e8f3702..0f09351 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilder.java
@@ -15,21 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
 
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
-import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.generated.ComponentCommon;
 import backtype.storm.generated.GlobalStreamId;
@@ -43,10 +30,19 @@ import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.IRichStateSpout;
 import backtype.storm.topology.SpoutDeclarer;
 import backtype.storm.topology.TopologyBuilder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.stormcompatibility.wrappers.StormBoltWrapper;
+import org.apache.flink.stormcompatibility.wrappers.StormSpoutWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 
-
-
-
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * {@link FlinkTopologyBuilder} mimics a {@link TopologyBuilder}, but builds a Flink program instead of a Storm
@@ -57,23 +53,14 @@ import backtype.storm.topology.TopologyBuilder;
  * supported.</strong>
  */
 public class FlinkTopologyBuilder {
-	/**
-	 * A Storm {@link TopologyBuilder} to build a real Storm topology.
-	 */
+
+	// A Storm {@link TopologyBuilder} to build a real Storm topology
 	private final TopologyBuilder stormBuilder = new TopologyBuilder();
-	/**
-	 * All user spouts by their ID.
-	 */
+	// All user spouts by their ID
 	private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
-	/**
-	 * All user bolts by their ID.
-	 */
+	// All user bolts by their ID
 	private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();
-	
-	// TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
-	
-	
-	
+
 	/**
 	 * Creates a Flink program that used the specified spouts and bolts.
 	 */
@@ -82,150 +69,150 @@ public class FlinkTopologyBuilder {
 		final StormTopology stormTopolgoy = this.stormBuilder.createTopology();
 		final FlinkTopology env = new FlinkTopology(stormTopolgoy);
 		env.setParallelism(1);
-		
-		final HashMap<String, SingleOutputStreamOperator> availableOperators = new HashMap<String, SingleOutputStreamOperator>();
-		
-		for(final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
+
+		final HashMap<String, SingleOutputStreamOperator> availableOperators =
+				new HashMap<String, SingleOutputStreamOperator>();
+
+		for (final Entry<String, IRichSpout> spout : this.spouts.entrySet()) {
 			final String spoutId = spout.getKey();
 			final IRichSpout userSpout = spout.getValue();
-			
+
 			final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 			userSpout.declareOutputFields(declarer);
-			
-			// TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
-			// and StormCollector)
-			// -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
-			// the streams
+
+			/* TODO in order to support multiple output streams, use an additional wrapper (or modify StormSpoutWrapper
+			 * and StormCollector)
+			 * -> add an additional output attribute tagging the output stream, and use .split() and .select() to split
+			 * the streams
+			 */
 			final DataStreamSource source = env.addSource(new StormSpoutWrapper(userSpout), declarer.getOutputType());
 			availableOperators.put(spoutId, source);
-			
+
 			int dop = 1;
 			final ComponentCommon common = stormTopolgoy.get_spouts().get(spoutId).get_common();
-			if(common.is_set_parallelism_hint()) {
+			if (common.is_set_parallelism_hint()) {
 				dop = common.get_parallelism_hint();
 				source.setParallelism(dop);
 			}
 			env.increaseNumberOfTasks(dop);
 		}
-		
-		
-		
+
 		final HashMap<String, IRichBolt> unprocessedBolts = new HashMap<String, IRichBolt>();
 		unprocessedBolts.putAll(this.bolts);
-		
-		final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt = new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
-		
-		// because we do not know the order in which an iterator steps over a set, we might process a consumer before
-		// its producer
-		// -> thus, we might need to repeat multiple times
-		while(unprocessedBolts.size() > 0) {
-			
+
+		final HashMap<String, Set<Entry<GlobalStreamId, Grouping>>> unprocessdInputsPerBolt =
+				new HashMap<String, Set<Entry<GlobalStreamId, Grouping>>>();
+
+		/* Because we do not know the order in which an iterator steps over a set, we might process a consumer before
+		 * its producer
+		 * ->thus, we might need to repeat multiple times
+		 */
+		while (unprocessedBolts.size() > 0) {
+
 			final Iterator<Entry<String, IRichBolt>> boltsIterator = unprocessedBolts.entrySet().iterator();
-			while(boltsIterator.hasNext()) {
-				
+			while (boltsIterator.hasNext()) {
+
 				final Entry<String, IRichBolt> bolt = boltsIterator.next();
 				final String boltId = bolt.getKey();
 				final IRichBolt userBolt = bolt.getValue();
-				
+
 				final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
 				userBolt.declareOutputFields(declarer);
-				
+
 				final ComponentCommon common = stormTopolgoy.get_bolts().get(boltId).get_common();
-				
+
 				Set<Entry<GlobalStreamId, Grouping>> unprocessedInputs = unprocessdInputsPerBolt.get(boltId);
-				if(unprocessedInputs == null) {
+				if (unprocessedInputs == null) {
 					unprocessedInputs = new HashSet<Entry<GlobalStreamId, Grouping>>();
 					unprocessedInputs.addAll(common.get_inputs().entrySet());
 					unprocessdInputsPerBolt.put(boltId, unprocessedInputs);
 				}
-				
+
 				// connect each available producer to the current bolt
 				final Iterator<Entry<GlobalStreamId, Grouping>> inputStreamsIterator = unprocessedInputs.iterator();
-				while(inputStreamsIterator.hasNext()) {
-					
+				while (inputStreamsIterator.hasNext()) {
+
 					final Entry<GlobalStreamId, Grouping> inputStream = inputStreamsIterator.next();
 					final String producerId = inputStream.getKey().get_componentId();
-					
+
 					DataStream<?> inputDataStream = availableOperators.get(producerId);
-					
-					if(inputDataStream != null) { // if producer was processed already
+
+					if (inputDataStream != null) {
+						// if producer was processed already
 						final Grouping grouping = inputStream.getValue();
-						if(grouping.is_set_shuffle()) {
+						if (grouping.is_set_shuffle()) {
 							// Storm uses a round-robin shuffle strategy
 							inputDataStream = inputDataStream.distribute();
-						} else if(grouping.is_set_fields()) {
+						} else if (grouping.is_set_fields()) {
 							// global grouping is emulated in Storm via an empty fields grouping list
 							final List<String> fields = grouping.get_fields();
-							if(fields.size() > 0) {
+							if (fields.size() > 0) {
 								inputDataStream = inputDataStream.groupBy(declarer.getGroupingFieldIndexes(grouping
-									.get_fields()));
+										.get_fields()));
 							} else {
 								inputDataStream = inputDataStream.global();
 							}
-						} else if(grouping.is_set_all()) {
+						} else if (grouping.is_set_all()) {
 							inputDataStream = inputDataStream.broadcast();
-						} else if(grouping.is_set_local_or_shuffle()) {
-							// nothing to do
-						} else {
+						} else if (!grouping.is_set_local_or_shuffle()) {
 							throw new UnsupportedOperationException(
-								"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
+									"Flink only supports (local-or-)shuffle, fields, all, and global grouping");
 						}
-						
+
 						final TypeInformation<?> outType = declarer.getOutputType();
-						
-						@SuppressWarnings("null")
+
 						final SingleOutputStreamOperator operator = inputDataStream.transform(boltId, outType,
-							new StormBoltWrapper(userBolt));
-						if(outType != null) { // only for non-sink nodes
+								new StormBoltWrapper(userBolt));
+						if (outType != null) {
+							// only for non-sink nodes
 							availableOperators.put(boltId, operator);
 						}
-						
+
 						int dop = 1;
-						if(common.is_set_parallelism_hint()) {
+						if (common.is_set_parallelism_hint()) {
 							dop = common.get_parallelism_hint();
 							operator.setParallelism(dop);
 						}
 						env.increaseNumberOfTasks(dop);
-						
+
 						inputStreamsIterator.remove();
 					}
 				}
-				
-				if(unprocessedInputs.size() == 0) { // all inputs are connected; processing bolt completed
+
+				if (unprocessedInputs.size() == 0) {
+					// all inputs are connected; processing bolt completed
 					boltsIterator.remove();
 				}
 			}
 		}
 		return env;
 	}
-	
+
 	/**
 	 * Define a new bolt in this topology with parallelism of just one thread.
-	 * 
+	 *
 	 * @param id
-	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
-	 *            outputs.
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
 	 * @param bolt
-	 *            the bolt
-	 * 
+	 * 		the bolt
 	 * @return use the returned object to declare the inputs to this component
 	 */
 	public BoltDeclarer setBolt(final String id, final IRichBolt bolt) {
 		return this.setBolt(id, bolt, null);
 	}
-	
+
 	/**
 	 * Define a new bolt in this topology with the specified amount of parallelism.
-	 * 
+	 *
 	 * @param id
-	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
-	 *            outputs.
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
 	 * @param bolt
-	 *            the bolt
+	 * 		the bolt
 	 * @param parallelism_hint
-	 *            the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
-	 *            process somewhere around the cluster.
-	 * 
+	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 * 		process somewhere around the cluster.
 	 * @return use the returned object to declare the inputs to this component
 	 */
 	public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number parallelism_hint) {
@@ -233,82 +220,85 @@ public class FlinkTopologyBuilder {
 		this.bolts.put(id, bolt);
 		return declarer;
 	}
-	
+
 	/**
-	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+	 * kind
 	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
 	 * achieve proper reliability in the topology.
-	 * 
+	 *
 	 * @param id
-	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
-	 *            outputs.
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
 	 * @param bolt
-	 *            the basic bolt
-	 * 
+	 * 		the basic bolt
 	 * @return use the returned object to declare the inputs to this component
 	 */
+	@SuppressWarnings("unused")
 	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
 		return this.setBolt(id, bolt, null);
 	}
-	
+
 	/**
-	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind
+	 * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted
+	 * kind
 	 * of bolt. Basic bolts are intended for non-aggregation processing and automate the anchoring/acking process to
 	 * achieve proper reliability in the topology.
-	 * 
+	 *
 	 * @param id
-	 *            the id of this component. This id is referenced by other components that want to consume this bolt's
-	 *            outputs.
+	 * 		the id of this component. This id is referenced by other components that want to consume this bolt's
+	 * 		outputs.
 	 * @param bolt
-	 *            the basic bolt
+	 * 		the basic bolt
 	 * @param parallelism_hint
-	 *            the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
-	 *            process somwehere around the cluster.
-	 * 
+	 * 		the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a
+	 * 		process somwehere around the cluster.
 	 * @return use the returned object to declare the inputs to this component
 	 */
 	public BoltDeclarer setBolt(final String id, final IBasicBolt bolt, final Number parallelism_hint) {
 		return this.setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
 	}
-	
+
 	/**
 	 * Define a new spout in this topology.
-	 * 
+	 *
 	 * @param id
-	 *            the id of this component. This id is referenced by other components that want to consume this spout's
-	 *            outputs.
+	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
+	 * 		outputs.
 	 * @param spout
-	 *            the spout
+	 * 		the spout
 	 */
 	public SpoutDeclarer setSpout(final String id, final IRichSpout spout) {
 		return this.setSpout(id, spout, null);
 	}
-	
+
 	/**
 	 * Define a new spout in this topology with the specified parallelism. If the spout declares itself as
 	 * non-distributed, the parallelism_hint will be ignored and only one task will be allocated to this component.
-	 * 
+	 *
 	 * @param id
-	 *            the id of this component. This id is referenced by other components that want to consume this spout's
-	 *            outputs.
+	 * 		the id of this component. This id is referenced by other components that want to consume this spout's
+	 * 		outputs.
 	 * @param parallelism_hint
-	 *            the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
-	 *            process somwehere around the cluster.
+	 * 		the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a
+	 * 		process somwehere around the cluster.
 	 * @param spout
-	 *            the spout
+	 * 		the spout
 	 */
 	public SpoutDeclarer setSpout(final String id, final IRichSpout spout, final Number parallelism_hint) {
 		final SpoutDeclarer declarer = this.stormBuilder.setSpout(id, spout, parallelism_hint);
 		this.spouts.put(id, spout);
 		return declarer;
 	}
-	
-	// not implemented by Storm 0.9.4
-	// public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
-	// this.stormBuilder.setStateSpout(id, stateSpout);
-	// }
-	// public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
-	// this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
-	// }
-	
+
+	// TODO add StateSpout support (Storm 0.9.4 does not yet support StateSpouts itself)
+	/* not implemented by Storm 0.9.4
+	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout) {
+	 * this.stormBuilder.setStateSpout(id, stateSpout);
+	 * }
+	 * public void setStateSpout(final String id, final IRichStateSpout stateSpout, final Number parallelism_hint) {
+	 * this.stormBuilder.setStateSpout(id, stateSpout, parallelism_hint);
+	 * }
+	 */
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
index 890f695..a761617 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContext.java
@@ -14,10 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
 
-import java.util.Collection;
-import java.util.Map;
+package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.generated.StormTopology;
 import backtype.storm.hooks.ITaskHook;
@@ -29,137 +27,135 @@ import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.state.ISubscribedState;
 import backtype.storm.task.TopologyContext;
 
-
-
-
+import java.util.Collection;
+import java.util.Map;
 
 /**
  * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
  * a Storm topology is executed within Flink.
  */
 public class FlinkTopologyContext extends TopologyContext {
-	
-	
-	
+
 	/**
 	 * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated
 	 * for each parallel task
-	 * 
+	 *
 	 * @param topology
-	 *            The Storm topology that is currently executed
+	 * 		The Storm topology that is currently executed
 	 * @param taskToComponents
-	 *            A map from task IDs to Component IDs
+	 * 		A map from task IDs to Component IDs
 	 * @param taskId
-	 *            The ID of the task the context belongs to.
+	 * 		The ID of the task the context belongs to.
 	 */
 	public FlinkTopologyContext(final StormTopology topology, final Map<Integer, String> taskToComponents,
-		final Integer taskId) {
+			final Integer taskId) {
 		super(topology, null, taskToComponents, null, null, null, null, null, taskId, null, null, null, null, null,
-			null, null);
+				null, null);
 	}
-	
-	
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
 	public void addTaskHook(final ITaskHook hook) {
-		throw new UnsupportedOperationException("Task hooks are not supported by Flink.");
+		throw new UnsupportedOperationException("Task hooks are not supported by Flink");
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
 	public Collection<ITaskHook> getHooks() {
-		throw new UnsupportedOperationException("Task hooks are not supported by Flink.");
+		throw new UnsupportedOperationException("Task hooks are not supported by Flink");
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
 	public IMetric getRegisteredMetricByName(final String name) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink.");
-		
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
+
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
+	@SuppressWarnings("rawtypes")
 	@Override
-	public CombinedMetric registerMetric(final String name, @SuppressWarnings("rawtypes") final ICombiner combiner, final int timeBucketSizeInSecs) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink.");
+	public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) {
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
+	@SuppressWarnings("rawtypes")
 	@Override
-	public ReducedMetric registerMetric(final String name, @SuppressWarnings("rawtypes") final IReducer combiner, final int timeBucketSizeInSecs) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink.");
+	public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) {
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@SuppressWarnings("unchecked")
 	@Override
 	public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink.");
+		throw new UnsupportedOperationException("Metrics are not supported by Flink");
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
 	public <T extends ISubscribedState> T setAllSubscribedState(final T obj) {
-		throw new UnsupportedOperationException("Not supported by Flink.");
-		
+		throw new UnsupportedOperationException("Not supported by Flink");
+
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
 	public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) {
-		throw new UnsupportedOperationException("Not supported by Flink.");
+		throw new UnsupportedOperationException("Not supported by Flink");
 	}
-	
+
 	/**
 	 * Not supported by Flink.
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
-	 *             at every invocation
+	 * 		at every invocation
 	 */
 	@Override
-	public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T obj) {
-		throw new UnsupportedOperationException("Not supported by Flink.");
+	public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T
+			obj) {
+		throw new UnsupportedOperationException("Not supported by Flink");
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
index f265d4c..5bc4635 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
@@ -14,21 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.topology.IRichSpout;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 import org.apache.flink.util.Collector;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.topology.IRichSpout;
-
-
-
-
-
 /**
  * A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
  * Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
@@ -40,93 +36,83 @@ import backtype.storm.topology.IRichSpout;
  */
 public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
 	private static final long serialVersionUID = 4993283609095408765L;
-	
-	/**
-	 * The wrapped Storm {@link IRichSpout spout}.
-	 */
+
+	// The wrapped Storm {@link IRichSpout spout}
 	protected final IRichSpout spout;
-	/**
-	 * Number of attributes of the bolt's output tuples.
-	 */
+	// Number of attributes of the bolt's output tuples
 	private final int numberOfAttributes;
-	/**
-	 * The wrapper of the given Flink collector.
-	 */
+	// The wrapper of the given Flink collector
 	protected StormCollector<OUT> collector;
-	/**
-	 * Indicates, if the source is still running or was canceled;
-	 */
+	// Indicates, if the source is still running or was canceled
 	protected boolean isRunning = true;
-	
-	
-	
+
 	/**
 	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
 	 * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple1} to
 	 * {@link Tuple25} depending on the spout's declared number of attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [1;25].
+	 * 		If the number of declared output attributes is not with range [1;25].
 	 */
+	@SuppressWarnings("unused")
 	public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
 		this(spout, false);
 	}
-	
+
 	/**
 	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
 	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
 	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
 	 * {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared
 	 * number of attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @param rawOutput
-	 *            Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
 	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
 	 */
 	public AbstractStormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
 		this.spout = spout;
 		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutput);
 	}
-	
-	
-	
+
 	@Override
-	public final void run(@SuppressWarnings("hiding") final Collector<OUT> collector) throws Exception {
+	public final void run(final Collector<OUT> collector) throws Exception {
 		this.collector = new StormCollector<OUT>(this.numberOfAttributes, collector);
 		this.spout.open(null,
-			StormWrapperSetupHelper.convertToTopologyContext((StreamingRuntimeContext)super.getRuntimeContext(), true),
-			new SpoutOutputCollector(this.collector));
+				StormWrapperSetupHelper
+						.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
+				new SpoutOutputCollector(this.collector));
 		this.spout.activate();
 		this.execute();
 	}
-	
+
 	/**
 	 * Needs to be implemented to call the given Spout's {@link IRichSpout#nextTuple() nextTuple()} method. This method
 	 * might use a {@code while(true)}-loop to emit an infinite number of tuples.
 	 */
 	protected abstract void execute();
-	
+
 	/**
 	 * {@inheritDoc}
-	 * 
+	 * <p/>
 	 * Sets the {@link #isRunning} flag to {@code false}.
 	 */
 	@Override
 	public void cancel() {
 		this.isRunning = false;
 	}
-	
+
 	@Override
 	public void close() throws Exception {
 		this.spout.close();
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
index e09f250..3dbc451 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunction.java
@@ -14,17 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.wrappers;
 
-import java.io.Serializable;
+package org.apache.flink.stormcompatibility.wrappers;
 
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
 
-
-
-
+import java.io.Serializable;
 
 /**
  * {@link FlinkDummyRichFunction} has the only purpose to retrieve the {@link RuntimeContext} for
@@ -32,28 +29,24 @@ import org.apache.flink.configuration.Configuration;
  */
 class FlinkDummyRichFunction implements RichFunction, Serializable {
 	private static final long serialVersionUID = 7992273349877302520L;
-	
-	/**
-	 * The runtime context of a Storm bolt.
-	 */
+
+	// The runtime context of a Storm bolt
 	private RuntimeContext context;
-	
-	
-	
+
 	@Override
 	public void open(final Configuration parameters) throws Exception {/* nothing to do */}
-	
+
 	@Override
 	public void close() throws Exception {/* nothing to do */}
-	
+
 	@Override
 	public RuntimeContext getRuntimeContext() {
 		return this.context;
 	}
-	
+
 	@Override
 	public void setRuntimeContext(final RuntimeContext t) {
 		this.context = t;
 	}
-	
+
 }