You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:01 UTC

[11/27] flink git commit: [storm-compat] Storm compatibility code cleanup

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
index d322cc5..6a5efdf 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
@@ -14,22 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-
-
-
-
-
 /**
  * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
  * program. It takes the Flink input tuples of type {@code IN} and transforms them into a {@link StormTuple}s that the
@@ -43,95 +39,88 @@ import backtype.storm.topology.IRichBolt;
  */
 public class StormBoltWrapper<IN, OUT> extends StreamOperator<IN, OUT> {
 	private static final long serialVersionUID = -4788589118464155835L;
-	
-	/**
-	 * The wrapped Storm {@link IRichBolt bolt}.
-	 */
+
+	// The wrapped Storm {@link IRichBolt bolt}
 	private final IRichBolt bolt;
-	/**
-	 * Number of attributes of the bolt's output tuples.
-	 */
+	// Number of attributes of the bolt's output tuples
 	private final int numberOfAttributes;
-	
-	
-	
+
 	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
-	 * used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25} depending
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
+	 * can be
+	 * used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
+	 * depending
 	 * on the bolt's declared number of attributes.
-	 * 
+	 *
 	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
+	 * 		The Storm {@link IRichBolt bolt} to be used.
 	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [1;25].
+	 * 		If the number of declared output attributes is not with range [1;25].
 	 */
 	public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
 		this(bolt, false);
 	}
-	
+
 	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
+	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it
+	 * can be
 	 * used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
 	 * {@code true} and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
 	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the bolt's declared number of
 	 * attributes.
-	 * 
+	 *
 	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
+	 * 		The Storm {@link IRichBolt bolt} to be used.
 	 * @param rawOutput
-	 *            Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
 	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
 	 */
 	public StormBoltWrapper(final IRichBolt bolt, final boolean rawOutput) throws IllegalArgumentException {
 		super(new FlinkDummyRichFunction());
 		this.bolt = bolt;
 		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutput);
 	}
-	
-	
-	
+
 	@Override
 	public void open(final Configuration parameters) throws Exception {
 		super.open(parameters);
-		
-		final StreamingRuntimeContext flinkContext = (StreamingRuntimeContext)((FlinkDummyRichFunction)super
-			.getUserFunction()).getRuntimeContext();
-		
+
+		final StreamingRuntimeContext flinkContext = (StreamingRuntimeContext) ((FlinkDummyRichFunction) super
+				.getUserFunction()).getRuntimeContext();
+
 		final TopologyContext topologyContext = StormWrapperSetupHelper.convertToTopologyContext(flinkContext, false);
 		OutputCollector stormCollector = null;
-		
-		if(this.numberOfAttributes != -1) {
+
+		if (this.numberOfAttributes != -1) {
 			stormCollector = new OutputCollector(new StormCollector<OUT>(this.numberOfAttributes, super.collector));
 		}
-		
+
 		this.bolt.prepare(null, topologyContext, stormCollector);
 	}
-	
+
 	@Override
 	public void close() {
 		super.close();
 		this.bolt.cleanup();
 	}
-	
+
 	/**
-	 * {@inheritDoc}
-	 * 
 	 * Transforms a Flink tuple into a Storm tuple and calls the bolt's {@code execute} method.
 	 */
 	@Override
 	protected void callUserFunction() throws Exception {
 		this.bolt.execute(new StormTuple<IN>(this.nextRecord.getObject()));
 	}
-	
+
 	@Override
 	public void run() throws Exception {
-		while(this.readNext() != null) {
+		while (this.readNext() != null) {
 			this.callUserFunctionAndLogException();
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
index ed333b8..1c13e88 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormCollector.java
@@ -14,22 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.stormcompatibility.wrappers;
 
-import java.util.Collection;
-import java.util.List;
+/* we do not import
+ * --> "org.apache.flink.api.java.tuple.Tuple"
+ * or
+ * --> "backtype.storm.tuple.Tuple"
+ * to avoid confusion
+ */
 
+import backtype.storm.spout.ISpoutOutputCollector;
+import backtype.storm.task.IOutputCollector;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 import org.apache.flink.util.Collector;
 
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.task.IOutputCollector;
-
-
-
-
+import java.util.Collection;
+import java.util.List;
 
 /**
  * A {@link StormCollector} is used by {@link AbstractStormSpoutWrapper} and {@link StormBoltWrapper} to provided an
@@ -38,119 +41,105 @@ import backtype.storm.task.IOutputCollector;
  * {@link Collector}.
  */
 class StormCollector<OUT> implements ISpoutOutputCollector, IOutputCollector {
-	// we do not import
-	// --> "org.apache.flink.api.java.tuple.Tuple"
-	// or
-	// --> "backtype.storm.tuple.Tuple"
-	// to avoid confusion
-	
-	/**
-	 * The Flink collector.
-	 */
+
+	// The Flink collector
 	private final Collector<OUT> flinkCollector;
-	/**
-	 * The Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}.
-	 */
+	// The Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}
 	private final org.apache.flink.api.java.tuple.Tuple outputTuple;
-	/**
-	 * The number of attributes of the output tuples. (Determines the concrete type of {@link #outputTuple}).
-	 */
+	// The number of attributes of the output tuples. (Determines the concrete type of {@link #outputTuple})
 	private final int numberOfAttributes;
-	/**
-	 * Is set to {@code true} each time a tuple is emitted.
-	 */
+	// Is set to {@code true} each time a tuple is emitted
 	boolean tupleEmitted = false;
-	
-	
-	
+
 	/**
 	 * Instantiates a new {@link StormCollector} that emits Flink tuples to the given Flink collector. If the number of
 	 * attributes is specified as zero, any output type is supported. If the number of attributes is between 1 to 25,
 	 * the output type is {@link Tuple1} to {@link Tuple25}.
-	 * 
+	 *
 	 * @param numberOfAttributes
-	 *            The number of attributes of the emitted tuples.
+	 * 		The number of attributes of the emitted tuples.
 	 * @param flinkCollector
-	 *            The Flink collector to be used.
-	 * 
+	 * 		The Flink collector to be used.
 	 * @throws UnsupportedOperationException
-	 *             if the specified number of attributes is not in the valid range of [0,25]
+	 * 		if the specified number of attributes is not in the valid range of [0,25]
 	 */
 	public StormCollector(final int numberOfAttributes, final Collector<OUT> flinkCollector)
-		throws UnsupportedOperationException {
+			throws UnsupportedOperationException {
 		this.numberOfAttributes = numberOfAttributes;
 		this.flinkCollector = flinkCollector;
-		
-		if(this.numberOfAttributes <= 0) {
+
+		if (this.numberOfAttributes <= 0) {
 			this.outputTuple = null;
-		} else if(this.numberOfAttributes <= 25) {
+		} else if (this.numberOfAttributes <= 25) {
 			try {
 				this.outputTuple = Tuple.getTupleClass(this.numberOfAttributes).newInstance();
-			} catch(final InstantiationException e) {
+			} catch (final InstantiationException e) {
 				throw new RuntimeException(e);
-			} catch(final IllegalAccessException e) {
+			} catch (final IllegalAccessException e) {
 				throw new RuntimeException(e);
 			}
 		} else {
 			throw new UnsupportedOperationException(
-				"SimpleStormBoltWrapper can handle not more then 25 attributes, but " + this.numberOfAttributes
-					+ " are declared by the given bolt.");
+					"SimpleStormBoltWrapper can handle not more then 25 attributes, but " + this.numberOfAttributes
+							+ " are declared by the given bolt");
 		}
 	}
-	
-	
-	
+
 	@Override
 	public void reportError(final Throwable error) {
 		// not sure, if Flink can support this
-		throw new UnsupportedOperationException("Not implemented yet.");
+		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
-		return this.emitImpl(streamId, null, tuple, messageId);
+		return this.emitImpl(tuple);
 	}
-	
+
 	@Override
-	public List<Integer> emit(final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple) {
-		return this.emitImpl(streamId, anchors, tuple, null);
+	public List<Integer> emit(final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors,
+			final List<Object> tuple) {
+		return this.emitImpl(tuple);
 	}
-	
+
 	@SuppressWarnings("unchecked")
-	public List<Integer> emitImpl(final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple, final Object messageId) {
-		if(this.numberOfAttributes > 0) {
+	public List<Integer> emitImpl(
+			final List<Object> tuple) {
+		if (this.numberOfAttributes > 0) {
 			assert (tuple.size() == this.numberOfAttributes);
-			for(int i = 0; i < this.numberOfAttributes; ++i) {
+			for (int i = 0; i < this.numberOfAttributes; ++i) {
 				this.outputTuple.setField(tuple.get(i), i);
 			}
-			this.flinkCollector.collect((OUT)this.outputTuple);
+			this.flinkCollector.collect((OUT) this.outputTuple);
 		} else {
 			assert (tuple.size() == 1);
-			this.flinkCollector.collect((OUT)tuple.get(0));
+			this.flinkCollector.collect((OUT) tuple.get(0));
 		}
 		this.tupleEmitted = true;
-		
-		return null; // TODO
+
+		// TODO
+		return null;
 	}
-	
+
 	@Override
 	public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
 		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 	}
-	
+
 	@Override
-	public void emitDirect(final int taskId, final String streamId, final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple) {
+	public void emitDirect(final int taskId, final String streamId,
+			final Collection<backtype.storm.tuple.Tuple> anchors, final List<Object> tuple) {
 		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 	}
-	
+
 	@Override
 	public void ack(final backtype.storm.tuple.Tuple input) {
-		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink.");
+		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
 	}
-	
+
 	@Override
 	public void fail(final backtype.storm.tuple.Tuple input) {
-		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink.");
+		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
index f72fcee..ebbf80f 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
@@ -14,17 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.topology.IRichSpout;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 
-import backtype.storm.topology.IRichSpout;
-
-
-
-
-
 /**
  * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calles {@link IRichSpout#nextTuple()
  * nextTuple()} for finite number of times before {@link #run(org.apache.flink.util.Collector)} returns. The number of
@@ -33,113 +29,104 @@ import backtype.storm.topology.IRichSpout;
  */
 public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
 	private static final long serialVersionUID = 3883246587044801286L;
-	
-	/**
-	 * The number of {@link IRichSpout#nextTuple()} calls.
-	 */
+
+	// The number of {@link IRichSpout#nextTuple()} calls
 	private int numberOfInvocations;
-	
-	
-	
+
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
 	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
 	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
 	 * attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * 
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [1;25].
+	 * 		If the number of declared output attributes is not with range [1;25].
 	 */
 	public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
 		this(spout, false, -1);
 	}
-	
+
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one
+	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be
+	 * one
 	 * of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @param numberOfInvocations
-	 *            The number of calls to {@link IRichSpout#nextTuple()}.
-	 * 
+	 * 		The number of calls to {@link IRichSpout#nextTuple()}.
 	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [1;25].
+	 * 		If the number of declared output attributes is not with range [1;25].
 	 */
 	public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations)
-		throws IllegalArgumentException {
+			throws IllegalArgumentException {
 		this(spout, false, numberOfInvocations);
 	}
-	
+
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
 	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
 	 * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
 	 * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to
 	 * {@link Tuple25} depending on the spout's declared number of attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @param rawOutput
-	 *            Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * 
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
 	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
 	 */
 	public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
 		this(spout, rawOutput, -1);
 	}
-	
+
 	/**
 	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
 	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
 	 * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
 	 * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple1} to {@link Tuple25} depending on
 	 * the spout's declared number of attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @param rawOutput
-	 *            Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
 	 * @param numberOfInvocations
-	 *            The number of calls to {@link IRichSpout#nextTuple()}.
-	 * 
+	 * 		The number of calls to {@link IRichSpout#nextTuple()}.
 	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
 	 */
 	public StormFiniteSpoutWrapper(final IRichSpout spout, final boolean rawOutput, final int numberOfInvocations)
-		throws IllegalArgumentException {
+			throws IllegalArgumentException {
 		super(spout, rawOutput);
 		this.numberOfInvocations = numberOfInvocations;
 	}
-	
-	
-	
+
 	/**
 	 * Calls {@link IRichSpout#nextTuple()} for the given number of times.
 	 */
 	@Override
 	protected void execute() {
-		if(this.numberOfInvocations >= 0) {
-			while((--this.numberOfInvocations >= 0) && super.isRunning) {
+		if (this.numberOfInvocations >= 0) {
+			while ((--this.numberOfInvocations >= 0) && super.isRunning) {
 				super.spout.nextTuple();
 			}
 		} else {
 			do {
 				super.collector.tupleEmitted = false;
 				super.spout.nextTuple();
-			} while(super.collector.tupleEmitted && super.isRunning);
+			} while (super.collector.tupleEmitted && super.isRunning);
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
index a9c66ee..6005d6d 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarer.java
@@ -21,61 +21,54 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
 
-
-
-
-
 /**
  * {@link StormOutputFieldsDeclarer} is used by {@link StormBoltWrapper} to determine the number of attributes declared
  * by the wrapped bolt's {@code declare(...)} method.
  */
 class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {
-	/**
-	 * The output schema declared by the wrapped bolt.
-	 */
+
+	// The output schema declared by the wrapped bolt.
 	private Fields outputSchema = null;
-	
-	
-	
+
 	@Override
 	public void declare(final Fields fields) {
 		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
 	}
-	
+
 	@Override
 	public void declare(final boolean direct, final Fields fields) {
 		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
 	}
-	
+
 	@Override
 	public void declareStream(final String streamId, final Fields fields) {
 		this.declareStream(streamId, false, fields);
 	}
-	
+
 	@Override
 	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
-			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink.");
+		if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
+			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
 		}
-		if(direct) {
+		if (direct) {
 			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
 		}
-		
+
 		this.outputSchema = fields;
 	}
-	
+
 	/**
 	 * Returns the number of attributes of the output schema declare by the wrapped bolt. If no output schema is
 	 * declared (eg, for sink bolts), {@code -1} is returned.
-	 * 
+	 *
 	 * @return the number of attributes of the output schema declare by the wrapped bolt
 	 */
 	public int getNumberOfAttributes() {
-		if(this.outputSchema != null) {
+		if (this.outputSchema != null) {
 			return this.outputSchema.size();
 		}
-		
+
 		return -1;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
index 0c7d62d..f5e0733 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
@@ -14,70 +14,65 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.topology.IRichSpout;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple25;
 
-import backtype.storm.topology.IRichSpout;
-
-
-
-
-
 /**
  * A {@link StormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped spout's
  * {@link IRichSpout#nextTuple() nextTuple()} method in in infinite loop.
  */
 public class StormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
 	private static final long serialVersionUID = -218340336648247605L;
-	
+
 	/**
-	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
+	 * can
 	 * be used within a Flink streaming program. The output type will be one of {@link Tuple1} to {@link Tuple25}
 	 * depending on the spout's declared number of attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * 
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [1;25].
+	 * 		If the number of declared output attributes is not with range [1;25].
 	 */
 	public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
 		super(spout, false);
 	}
-	
+
 	/**
-	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
+	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
+	 * can
 	 * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
 	 * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
 	 * output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
 	 * attributes.
-	 * 
+	 *
 	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * 
+	 * 		The Storm {@link IRichSpout spout} to be used.
 	 * @param rawOutput
-	 *            Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * 
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
 	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
 	 */
 	public StormSpoutWrapper(final IRichSpout spout, final boolean rawOutput) throws IllegalArgumentException {
 		super(spout, rawOutput);
 	}
-	
+
 	/**
 	 * Calls {@link IRichSpout#nextTuple()} in an infinite loop until {@link #cancel()} is called.
 	 */
 	@Override
 	protected void execute() {
-		while(super.isRunning) {
+		while (super.isRunning) {
 			super.spout.nextTuple();
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
index 21f8de4..8019d7d 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
@@ -18,209 +18,202 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
-import java.util.List;
-
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.MessageId;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 
-
-
-
+import java.util.List;
 
 /**
  * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
  */
 class StormTuple<IN> implements Tuple {
-	/**
-	 * The storm representation of the original Flink tuple.
-	 */
+
+	// The storm representation of the original Flink tuple
 	private final Values stormTuple;
-	
-	
-	
+
 	/**
 	 * Create a new Storm tuple from the given Flink tuple.
-	 * 
+	 *
 	 * @param flinkTuple
-	 *            The Flink tuple to be converted.
+	 * 		The Flink tuple to be converted.
 	 */
 	public StormTuple(final IN flinkTuple) {
-		if(flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
-			final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple)flinkTuple;
-			
+		if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
+			final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
+
 			final int numberOfAttributes = t.getArity();
 			this.stormTuple = new Values();
-			for(int i = 0; i < numberOfAttributes; ++i) {
+			for (int i = 0; i < numberOfAttributes; ++i) {
 				this.stormTuple.add(t.getField(i));
 			}
 		} else {
 			this.stormTuple = new Values(flinkTuple);
 		}
 	}
-	
+
 	@Override
 	public int size() {
 		return this.stormTuple.size();
 	}
-	
+
 	@Override
 	public boolean contains(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Fields getFields() {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public int fieldIndex(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public List<Object> select(final Fields selector) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Object getValue(final int i) {
 		return this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public String getString(final int i) {
-		return (String)this.stormTuple.get(i);
+		return (String) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Integer getInteger(final int i) {
-		return (Integer)this.stormTuple.get(i);
+		return (Integer) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Long getLong(final int i) {
-		return (Long)this.stormTuple.get(i);
+		return (Long) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Boolean getBoolean(final int i) {
-		return (Boolean)this.stormTuple.get(i);
+		return (Boolean) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Short getShort(final int i) {
-		return (Short)this.stormTuple.get(i);
+		return (Short) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Byte getByte(final int i) {
-		return (Byte)this.stormTuple.get(i);
+		return (Byte) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Double getDouble(final int i) {
-		return (Double)this.stormTuple.get(i);
+		return (Double) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Float getFloat(final int i) {
-		return (Float)this.stormTuple.get(i);
+		return (Float) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public byte[] getBinary(final int i) {
-		return (byte[])this.stormTuple.get(i);
+		return (byte[]) this.stormTuple.get(i);
 	}
-	
+
 	@Override
 	public Object getValueByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public String getStringByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Integer getIntegerByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Long getLongByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Boolean getBooleanByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Short getShortByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Byte getByteByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Double getDoubleByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public Float getFloatByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public byte[] getBinaryByField(final String field) {
 		throw new UnsupportedOperationException("Not implemented yet");
 	}
-	
+
 	@Override
 	public List<Object> getValues() {
 		return this.stormTuple;
 	}
-	
+
 	@Override
 	public GlobalStreamId getSourceGlobalStreamid() {
 		// not sure if Flink can support this
 		throw new UnsupportedOperationException();
 	}
-	
+
 	@Override
 	public String getSourceComponent() {
 		// not sure if Flink can support this
 		throw new UnsupportedOperationException();
 	}
-	
+
 	@Override
 	public int getSourceTask() {
 		// not sure if Flink can support this
 		throw new UnsupportedOperationException();
 	}
-	
+
 	@Override
 	public String getSourceStreamId() {
 		// not sure if Flink can support this
 		throw new UnsupportedOperationException();
 	}
-	
+
 	@Override
 	public MessageId getMessageId() {
 		// not sure if Flink can support this
 		throw new UnsupportedOperationException();
 	}
-	
+
 	@Override
 	public int hashCode() {
 		final int prime = 31;
@@ -228,27 +221,27 @@ class StormTuple<IN> implements Tuple {
 		result = (prime * result) + ((this.stormTuple == null) ? 0 : this.stormTuple.hashCode());
 		return result;
 	}
-	
+
 	@Override
 	public boolean equals(final Object obj) {
-		if(this == obj) {
+		if (this == obj) {
 			return true;
 		}
-		if(obj == null) {
+		if (obj == null) {
 			return false;
 		}
-		if(this.getClass() != obj.getClass()) {
+		if (this.getClass() != obj.getClass()) {
 			return false;
 		}
-		final StormTuple<?> other = (StormTuple<?>)obj;
-		if(this.stormTuple == null) {
-			if(other.stormTuple != null) {
+		final StormTuple<?> other = (StormTuple<?>) obj;
+		if (this.stormTuple == null) {
+			if (other.stormTuple != null) {
 				return false;
 			}
-		} else if(!this.stormTuple.equals(other.stormTuple)) {
+		} else if (!this.stormTuple.equals(other.stormTuple)) {
 			return false;
 		}
 		return true;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
index ecbbe5c..a8f99e6 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
@@ -14,15 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.wrappers;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.generated.Bolt;
 import backtype.storm.generated.ComponentCommon;
@@ -32,84 +25,90 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.IComponent;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.IRichSpout;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.stormcompatibility.api.FlinkTopologyContext;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
-
-
-
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} or
  * {@link StormBoltWrapper}.
  */
 class StormWrapperSetupHelper {
-	
+
 	/**
-	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper}.
-	 * Returns zero for raw output type or a value within range [1;25] for output type {@link Tuple1} to {@link Tuple25}
+	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link
+	 * StormBoltWrapper}.
+	 * Returns zero for raw output type or a value within range [1;25] for output type {@link Tuple1} to {@link
+	 * Tuple25}
 	 * . In case of a data sink, {@code -1} is returned. .
-	 * 
+	 *
 	 * @param spoutOrBolt
-	 *            The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
+	 * 		The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
 	 * @param rawOutput
-	 *            Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
+	 * 		Set to {@code true} if a single attribute output stream, should not be of type {@link Tuple1} but be
+	 * 		of a raw type.
 	 * @return The number of attributes to be used.
 	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
+	 * 		If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
+	 * 		{@code rawOuput} is {@code false} and the number of declared output attributes is not with range
+	 * 		[1;25].
 	 */
 	public static int getNumberOfAttributes(final IComponent spoutOrBolt, final boolean rawOutput)
-		throws IllegalArgumentException {
+			throws IllegalArgumentException {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		spoutOrBolt.declareOutputFields(declarer);
-		
+
 		final int declaredNumberOfAttributes = declarer.getNumberOfAttributes();
-		
-		if(declaredNumberOfAttributes == -1) {
+
+		if (declaredNumberOfAttributes == -1) {
 			return -1;
 		}
-		
-		if((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) {
+
+		if ((declaredNumberOfAttributes < 1) || (declaredNumberOfAttributes > 25)) {
 			throw new IllegalArgumentException(
-				"Provided bolt declares non supported number of output attributes. Must be in range [1;25] but was "
-					+ declaredNumberOfAttributes);
+					"Provided bolt declares non supported number of output attributes. Must be in range [1;25] but " +
+							"was "
+							+ declaredNumberOfAttributes);
 		}
-		
-		if(rawOutput) {
-			if(declaredNumberOfAttributes > 1) {
+
+		if (rawOutput) {
+			if (declaredNumberOfAttributes > 1) {
 				throw new IllegalArgumentException(
-					"Ouput type is requested to be raw type, but provided bolt declares more then one output attribute.");
-				
+						"Ouput type is requested to be raw type, but provided bolt declares more then one output " +
+								"attribute.");
+
 			}
 			return 0;
 		}
-		
+
 		return declaredNumberOfAttributes;
 	}
-	
-	/**
-	 * TODO
-	 */
-	public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context, final boolean spoutOrBolt) {
-		final Integer taskId = new Integer(1 + context.getIndexOfThisSubtask());
-		
+
+	// TODO
+	public static TopologyContext convertToTopologyContext(final StreamingRuntimeContext context,
+			final boolean spoutOrBolt) {
+		final Integer taskId = 1 + context.getIndexOfThisSubtask();
+
 		final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
 		taskToComponents.put(taskId, context.getTaskName());
-		
+
 		final ComponentCommon common = new ComponentCommon();
 		common.set_parallelism_hint(context.getNumberOfParallelSubtasks());
-		
+
 		final Map<String, Bolt> bolts = new HashMap<String, Bolt>();
 		final Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
-		
-		if(spoutOrBolt) {
+
+		if (spoutOrBolt) {
 			spoutSpecs.put(context.getTaskName(), new SpoutSpec(null, common));
 		} else {
 			bolts.put(context.getTaskName(), new Bolt(null, common));
 		}
-		
+
 		return new FlinkTopologyContext(new StormTopology(spoutSpecs, bolts, null), taskToComponents, taskId);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
index 1902835..b55e1ef 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarerTest.java
@@ -14,164 +14,159 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
 
-import java.util.LinkedList;
+package org.apache.flink.stormcompatibility.api;
 
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-
-
-
+import java.util.LinkedList;
 
 public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
-	
+
 	@Test
 	public void testDeclare() {
-		for(int i = 0; i < 4; ++i) {
-			for(int j = 0; j <= 25; ++j) {
+		for (int i = 0; i < 4; ++i) {
+			for (int j = 0; j <= 25; ++j) {
 				this.runDeclareTest(i, j);
 			}
 		}
 	}
-	
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareSimpleToManyAttributes() {
 		this.runDeclareTest(0, 26);
 	}
-	
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareNonDirectToManyAttributes() {
 		this.runDeclareTest(1, 26);
 	}
-	
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareDefaultStreamToManyAttributes() {
 		this.runDeclareTest(2, 26);
 	}
-	
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testDeclareFullToManyAttributes() {
 		this.runDeclareTest(3, 26);
 	}
-	
+
 	private void runDeclareTest(final int testCase, final int numberOfAttributes) {
 		final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
-		
+
 		final String[] attributes = new String[numberOfAttributes];
-		for(int i = 0; i < numberOfAttributes; ++i) {
+		for (int i = 0; i < numberOfAttributes; ++i) {
 			attributes[i] = "a" + i;
 		}
-		
-		switch(testCase) {
-		case 0:
-			this.declareSimple(declarere, attributes);
-			break;
-		case 1:
-			this.declareNonDirect(declarere, attributes);
-			break;
-		case 2:
-			this.declareDefaultStream(declarere, attributes);
-			break;
-		default:
-			this.declareFull(declarere, attributes);
+
+		switch (testCase) {
+			case 0:
+				this.declareSimple(declarere, attributes);
+				break;
+			case 1:
+				this.declareNonDirect(declarere, attributes);
+				break;
+			case 2:
+				this.declareDefaultStream(declarere, attributes);
+				break;
+			default:
+				this.declareFull(declarere, attributes);
 		}
-		
-		
+
 		final TypeInformation<?> type = declarere.getOutputType();
-		
-		if(numberOfAttributes == 0) {
+
+		if (numberOfAttributes == 0) {
 			Assert.assertNull(type);
 		} else {
 			Assert.assertEquals(numberOfAttributes, type.getArity());
-			if(numberOfAttributes == 1) {
+			if (numberOfAttributes == 1) {
 				Assert.assertFalse(type.isTupleType());
 			} else {
 				Assert.assertTrue(type.isTupleType());
 			}
 		}
 	}
-	
+
 	private void declareSimple(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
 		declarere.declare(new Fields(attributes));
 	}
-	
+
 	private void declareNonDirect(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
 		declarere.declare(false, new Fields(attributes));
 	}
-	
+
 	private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
 		declarere.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes));
 	}
-	
+
 	private void declareFull(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
 		declarere.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes));
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareDirect() {
 		new FlinkOutputFieldsDeclarer().declare(true, null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareNonDefaultStrem() {
 		new FlinkOutputFieldsDeclarer().declareStream("dummy", null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareDirect2() {
 		new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareNonDefaultStrem2() {
 		new FlinkOutputFieldsDeclarer().declareStream("dummy", this.r.nextBoolean(), null);
 	}
-	
+
 	@Test
 	public void testGetGroupingFieldIndexes() {
 		final int numberOfAttributes = 5 + this.r.nextInt(21);
 		final String[] attributes = new String[numberOfAttributes];
-		for(int i = 0; i < numberOfAttributes; ++i) {
+		for (int i = 0; i < numberOfAttributes; ++i) {
 			attributes[i] = "a" + i;
 		}
-		
+
 		final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
 		declarere.declare(new Fields(attributes));
-		
+
 		final int numberOfKeys = 1 + this.r.nextInt(25);
 		final LinkedList<String> groupingFields = new LinkedList<String>();
 		final boolean[] indexes = new boolean[numberOfAttributes];
-		
-		for(int i = 0; i < numberOfAttributes; ++i) {
-			if(this.r.nextInt(26) < numberOfKeys) {
+
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			if (this.r.nextInt(26) < numberOfKeys) {
 				groupingFields.add(attributes[i]);
 				indexes[i] = true;
 			} else {
 				indexes[i] = false;
 			}
 		}
-		
+
 		final int[] expectedResult = new int[groupingFields.size()];
 		int j = 0;
-		for(int i = 0; i < numberOfAttributes; ++i) {
-			if(indexes[i]) {
+		for (int i = 0; i < numberOfAttributes; ++i) {
+			if (indexes[i]) {
 				expectedResult[j++] = i;
 			}
 		}
-		
+
 		final int[] result = declarere.getGroupingFieldIndexes(groupingFields);
-		
+
 		Assert.assertEquals(expectedResult.length, result.length);
-		for(int i = 0; i < expectedResult.length; ++i) {
+		for (int i = 0; i < expectedResult.length; ++i) {
 			Assert.assertEquals(expectedResult[i], result[i]);
 		}
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
index 029a400..d214610 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyContextTest.java
@@ -14,65 +14,61 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.api;
 
-import org.junit.Test;
+package org.apache.flink.stormcompatibility.api;
 
 import backtype.storm.metric.api.ICombiner;
 import backtype.storm.metric.api.IMetric;
 import backtype.storm.metric.api.IReducer;
-
-
-
-
+import org.junit.Test;
 
 public class FlinkTopologyContextTest {
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testAddTaskHook() {
 		new FlinkTopologyContext(null, null, null).addTaskHook(null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGetHooks() {
 		new FlinkTopologyContext(null, null, null).getHooks();
 	}
-	
+
 	@SuppressWarnings("rawtypes")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testRegisteredMetric1() {
-		new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner)null, 0);
+		new FlinkTopologyContext(null, null, null).registerMetric(null, (ICombiner) null, 0);
 	}
-	
+
 	@SuppressWarnings("rawtypes")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testRegisteredMetric2() {
-		new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer)null, 0);
+		new FlinkTopologyContext(null, null, null).registerMetric(null, (IReducer) null, 0);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testRegisteredMetric3() {
-		new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric)null, 0);
+		new FlinkTopologyContext(null, null, null).registerMetric(null, (IMetric) null, 0);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testGetRegisteredMetricByName() {
 		new FlinkTopologyContext(null, null, null).getRegisteredMetricByName(null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testSetAllSubscribedState() {
 		new FlinkTopologyContext(null, null, null).setAllSubscribedState(null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testSetSubscribedState1() {
 		new FlinkTopologyContext(null, null, null).setSubscribedState(null, null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testSetSubscribedState2() {
 		new FlinkTopologyContext(null, null, null).setSubscribedState(null, null, null);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
index 4f1f40f..f179919 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
@@ -14,52 +14,49 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.stormcompatibility.api;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-
-
-
-
 public class FlinkTopologyTest {
-	
+
 	@Test
 	public void testDefaultParallelism() {
 		final FlinkTopology topology = new FlinkTopology(null);
 		Assert.assertEquals(1, topology.getParallelism());
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testExecute() throws Exception {
 		new FlinkTopology(null).execute();
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testExecuteWithName() throws Exception {
 		new FlinkTopology(null).execute(null);
 	}
-	
+
 	@Test
 	public void testNumberOfTasks() {
 		final FlinkTopology topology = new FlinkTopology(null);
-		
+
 		Assert.assertEquals(0, topology.getNumberOfTasks());
-		
+
 		topology.increaseNumberOfTasks(3);
 		Assert.assertEquals(3, topology.getNumberOfTasks());
-		
+
 		topology.increaseNumberOfTasks(2);
 		Assert.assertEquals(5, topology.getNumberOfTasks());
-		
+
 		topology.increaseNumberOfTasks(8);
 		Assert.assertEquals(13, topology.getNumberOfTasks());
 	}
-	
+
 	@Test(expected = AssertionError.class)
 	public void testAssert() {
 		new FlinkTopology(null).increaseNumberOfTasks(0);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
index 100ca87..94a50cf 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
@@ -14,29 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.util;
 
-import java.util.Random;
+package org.apache.flink.stormcompatibility.util;
 
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
-
-
+import java.util.Random;
 
 public abstract class AbstractTest {
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
-	
+
 	protected long seed;
 	protected Random r;
-	
+
 	@Before
 	public void prepare() {
 		this.seed = System.currentTimeMillis();
 		this.r = new Random(this.seed);
 		LOG.info("Test seed: {}", new Long(this.seed));
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
index 52136a9..4182ad0 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteTestSpout.java
@@ -14,9 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.wrappers;
 
-import java.util.Map;
+package org.apache.flink.stormcompatibility.wrappers;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
@@ -25,59 +24,55 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
 
-
-
-
+import java.util.Map;
 
 class FiniteTestSpout implements IRichSpout {
 	private static final long serialVersionUID = 7992419478267824279L;
-	
+
 	private int numberOfOutputTuples;
 	private SpoutOutputCollector collector;
-	
-	
-	
+
 	public FiniteTestSpout(final int numberOfOutputTuples) {
 		this.numberOfOutputTuples = numberOfOutputTuples;
 	}
-	
-	
-	
+
+	@SuppressWarnings("rawtypes")
 	@Override
-	public void open(@SuppressWarnings("rawtypes") final Map conf, final TopologyContext context, @SuppressWarnings("hiding") final SpoutOutputCollector collector) {
+	public void open(final Map conf, final TopologyContext context,
+			final SpoutOutputCollector collector) {
 		this.collector = collector;
 	}
-	
+
 	@Override
 	public void close() {/* nothing to do */}
-	
+
 	@Override
 	public void activate() {/* nothing to do */}
-	
+
 	@Override
 	public void deactivate() {/* nothing to do */}
-	
+
 	@Override
 	public void nextTuple() {
-		if(--this.numberOfOutputTuples >= 0) {
-			this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
+		if (--this.numberOfOutputTuples >= 0) {
+			this.collector.emit(new Values(this.numberOfOutputTuples));
 		}
 	}
-	
+
 	@Override
 	public void ack(final Object msgId) {/* nothing to do */}
-	
+
 	@Override
 	public void fail(final Object msgId) {/* nothing to do */}
-	
+
 	@Override
 	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
 		declarer.declare(new Fields("dummy"));
 	}
-	
+
 	@Override
 	public Map<String, Object> getComponentConfiguration() {
 		return null;
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
index 7ecc98c..2c2a221 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/FlinkDummyRichFunctionTest.java
@@ -14,28 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.wrappers;
 
-import static org.mockito.Mockito.mock;
+package org.apache.flink.stormcompatibility.wrappers;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.junit.Assert;
 import org.junit.Test;
 
-
-
-
+import static org.mockito.Mockito.mock;
 
 public class FlinkDummyRichFunctionTest {
-	
+
 	@Test
 	public void testRuntimeContext() {
 		final FlinkDummyRichFunction dummy = new FlinkDummyRichFunction();
-		
+
 		final RuntimeContext context = mock(RuntimeContext.class);
 		dummy.setRuntimeContext(context);
-		
+
 		Assert.assertSame(context, dummy.getRuntimeContext());
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
index 50ae763..f2cfe59 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapperTest.java
@@ -17,16 +17,10 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Map;
-
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.tuple.Fields;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
@@ -41,177 +35,168 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-
-
-
+import java.util.Map;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({StreamRecordSerializer.class, StormWrapperSetupHelper.class})
 public class StormBoltWrapperTest {
-	
-	@SuppressWarnings("unused")
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperRawType() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy1", "dummy2"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-		
+
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), true);
 	}
-	
-	@SuppressWarnings("unused")
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes1() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		final String[] schema = new String[26];
-		for(int i = 0; i < schema.length; ++i) {
+		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
 		}
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-		
+
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class));
 	}
-	
-	@SuppressWarnings("unused")
+
 	@Test(expected = IllegalArgumentException.class)
 	public void testWrapperToManyAttributes2() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		final String[] schema = new String[26];
-		for(int i = 0; i < schema.length; ++i) {
+		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
 		}
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-		
+
 		new StormBoltWrapper<Object, Object>(mock(IRichBolt.class), false);
 	}
-	
+
 	@Test
 	public void testWrapper() throws Exception {
-		for(int i = 0; i < 26; ++i) {
+		for (int i = 0; i < 26; ++i) {
 			this.testWrapper(i);
 		}
 	}
-	
+
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	private void testWrapper(final int numberOfAttributes) throws Exception {
 		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
-		
+
 		Tuple flinkTuple = null;
 		String rawTuple = null;
-		
-		if(numberOfAttributes == 0) {
-			rawTuple = new String("test");
+
+		if (numberOfAttributes == 0) {
+			rawTuple = "test";
 		} else {
 			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
 		}
-		
+
 		String[] schema = new String[numberOfAttributes];
-		if(numberOfAttributes == 0) {
+		if (numberOfAttributes == 0) {
 			schema = new String[1];
 		}
-		for(int i = 0; i < schema.length; ++i) {
+		for (int i = 0; i < schema.length; ++i) {
 			schema[i] = "a" + i;
 		}
-		
-		
+
 		final StreamRecord record = mock(StreamRecord.class);
-		if(numberOfAttributes == 0) {
+		if (numberOfAttributes == 0) {
 			when(record.getObject()).thenReturn(rawTuple);
 		} else {
 			when(record.getObject()).thenReturn(flinkTuple);
 		}
-		
+
 		final StreamRecordSerializer serializer = mock(StreamRecordSerializer.class);
 		when(serializer.createInstance()).thenReturn(record);
-		
+
 		final IndexedReaderIterator reader = mock(IndexedReaderIterator.class);
 		when(reader.next(record)).thenReturn(record).thenReturn(null);
-		
+
 		final StreamTaskContext taskContext = mock(StreamTaskContext.class);
 		when(taskContext.getInputSerializer(0)).thenReturn(serializer);
 		when(taskContext.getIndexedInput(0)).thenReturn(reader);
-		
-		
-		
+
 		final IRichBolt bolt = mock(IRichBolt.class);
-		
+
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		declarer.declare(new Fields(schema));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-		
+
 		final StormBoltWrapper wrapper = new StormBoltWrapper(bolt);
 		wrapper.setup(taskContext);
-		
-		
-		
+
 		wrapper.callUserFunction();
-		if(numberOfAttributes == 0) {
+		if (numberOfAttributes == 0) {
 			verify(bolt).execute(eq(new StormTuple<String>(rawTuple)));
 		} else {
 			verify(bolt).execute(eq(new StormTuple<Tuple>(flinkTuple)));
 		}
-		
-		
-		
+
 		wrapper.run();
-		if(numberOfAttributes == 0) {
+		if (numberOfAttributes == 0) {
 			verify(bolt, times(2)).execute(eq(new StormTuple<String>(rawTuple)));
 		} else {
 			verify(bolt, times(2)).execute(eq(new StormTuple<Tuple>(flinkTuple)));
 		}
 	}
-	
+
 	@Test
 	public void testOpen() throws Exception {
 		final IRichBolt bolt = mock(IRichBolt.class);
-		
+
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-		
+
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
 		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-		
+
 		wrapper.open(mock(Configuration.class));
-		
+
 		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), any(OutputCollector.class));
 	}
-	
+
 	@Test
 	public void testOpenSink() throws Exception {
 		final IRichBolt bolt = mock(IRichBolt.class);
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
 		wrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-		
+
 		wrapper.open(mock(Configuration.class));
-		
+
 		verify(bolt).prepare(any(Map.class), any(TopologyContext.class), isNull(OutputCollector.class));
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	@Test
 	public void testClose() throws Exception {
 		final IRichBolt bolt = mock(IRichBolt.class);
-		
+
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-		
+
 		final StormBoltWrapper<Object, Object> wrapper = new StormBoltWrapper<Object, Object>(bolt);
-		
+
 		final StreamTaskContext<Object> taskContext = mock(StreamTaskContext.class);
 		when(taskContext.getOutputCollector()).thenReturn(mock(Collector.class));
 		wrapper.setup(taskContext);
-		
+
 		wrapper.close();
 		verify(bolt).cleanup();
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
index 660fcea..925da04 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormCollectorTest.java
@@ -17,109 +17,104 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import java.util.Collection;
-import java.util.List;
-
+import backtype.storm.tuple.Values;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Test;
 
-import backtype.storm.tuple.Values;
-
-
-
+import java.util.Collection;
+import java.util.List;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class StormCollectorTest extends AbstractTest {
-	
+
 	@Test
 	public void testSpoutStormCollector() throws InstantiationException, IllegalAccessException {
-		for(int i = 0; i < 26; ++i) {
+		for (int i = 0; i < 26; ++i) {
 			this.testStromCollector(true, i);
 		}
 	}
-	
+
 	@Test
 	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
-		for(int i = 0; i < 26; ++i) {
+		for (int i = 0; i < 26; ++i) {
 			this.testStromCollector(false, i);
 		}
 	}
-	
-	@SuppressWarnings({"unchecked", "rawtypes"})
+
+	@SuppressWarnings({"rawtypes", "unchecked"})
 	private void testStromCollector(final boolean spoutTest, final int numberOfAttributes)
-		throws InstantiationException, IllegalAccessException {
+			throws InstantiationException, IllegalAccessException {
 		assert ((0 <= numberOfAttributes) && (numberOfAttributes <= 25));
-		
+
 		final Collector flinkCollector = mock(Collector.class);
 		Tuple flinkTuple = null;
 		final Values tuple = new Values();
-		
-		StormCollector<?> collector = null;
-		
-		if(numberOfAttributes == 0) {
+
+		StormCollector<?> collector;
+
+		if (numberOfAttributes == 0) {
 			collector = new StormCollector(numberOfAttributes, flinkCollector);
-			tuple.add(new Integer(this.r.nextInt()));
-			
+			tuple.add(this.r.nextInt());
+
 		} else {
 			collector = new StormCollector(numberOfAttributes, flinkCollector);
 			flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-			
-			for(int i = 0; i < numberOfAttributes; ++i) {
-				tuple.add(new Integer(this.r.nextInt()));
+
+			for (int i = 0; i < numberOfAttributes; ++i) {
+				tuple.add(this.r.nextInt());
 				flinkTuple.setField(tuple.get(i), i);
 			}
 		}
-		
+
 		final String streamId = "streamId";
 		final Collection anchors = mock(Collection.class);
 		final List<Integer> taskIds;
-		final Object messageId = new Integer(this.r.nextInt());
-		if(spoutTest) {
+		final Object messageId = this.r.nextInt();
+		if (spoutTest) {
 			taskIds = collector.emit(streamId, tuple, messageId);
 		} else {
 			taskIds = collector.emit(streamId, anchors, tuple);
 		}
-		
+
 		Assert.assertNull(taskIds);
-		
-		if(numberOfAttributes == 0) {
+
+		if (numberOfAttributes == 0) {
 			verify(flinkCollector).collect(tuple.get(0));
 		} else {
 			verify(flinkCollector).collect(flinkTuple);
 		}
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testReportError() {
 		new StormCollector<Object>(1, null).reportError(null);
 	}
-	
+
 	@SuppressWarnings({"rawtypes", "unchecked"})
 	@Test(expected = UnsupportedOperationException.class)
 	public void testBoltEmitDirect() {
-		new StormCollector<Object>(1, null).emitDirect(0, (String)null, (Collection)null, (List)null);
+		new StormCollector<Object>(1, null).emitDirect(0, null, (Collection) null, null);
 	}
-	
-	@SuppressWarnings({"rawtypes", "unchecked"})
+
+	@SuppressWarnings("unchecked")
 	@Test(expected = UnsupportedOperationException.class)
 	public void testSpoutEmitDirect() {
-		new StormCollector<Object>(1, null).emitDirect(0, (String)null, (List)null, (Object)null);
+		new StormCollector<Object>(1, null).emitDirect(0, null, null, (Object) null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testAck() {
 		new StormCollector<Object>(1, null).ack(null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testFail() {
 		new StormCollector<Object>(1, null).fail(null);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
index 3c4f5aa..0c5b124 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapperTest.java
@@ -14,14 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
-import java.util.LinkedList;
+package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.tuple.Fields;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -32,80 +29,79 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
-
-
-
+import java.util.LinkedList;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StormWrapperSetupHelper.class)
 public class StormFiniteSpoutWrapperTest extends AbstractTest {
-	
+
 	@Test
 	public void testRunExecuteFixedNumber() throws Exception {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
 		declarer.declare(new Fields("dummy"));
 		PowerMockito.whenNew(StormOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);
-		
+
 		final IRichSpout spout = mock(IRichSpout.class);
 		final int numberOfCalls = this.r.nextInt(50);
 		final StormFiniteSpoutWrapper<?> spoutWrapper = new StormFiniteSpoutWrapper<Object>(spout, numberOfCalls);
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-		
+
 		spoutWrapper.run(null);
 		verify(spout, times(numberOfCalls)).nextTuple();
 	}
-	
+
 	@Test
 	public void testRunExecute() throws Exception {
 		final int numberOfCalls = this.r.nextInt(50);
-		
+
 		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
-		for(int i = numberOfCalls - 1; i >= 0; --i) {
-			expectedResult.add(new Tuple1<Integer>(new Integer(i)));
+		for (int i = numberOfCalls - 1; i >= 0; --i) {
+			expectedResult.add(new Tuple1<Integer>(i));
 		}
-		
+
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
-			spout);
+				spout);
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-		
+
 		final TestCollector collector = new TestCollector();
 		spoutWrapper.run(collector);
-		
+
 		Assert.assertEquals(expectedResult, collector.result);
 	}
-	
+
 	@Test
 	public void testCancel() throws Exception {
 		final int numberOfCalls = 5 + this.r.nextInt(5);
-		
+
 		final LinkedList<Tuple1<Integer>> expectedResult = new LinkedList<Tuple1<Integer>>();
-		expectedResult.add(new Tuple1<Integer>(new Integer(numberOfCalls - 1)));
-		
+		expectedResult.add(new Tuple1<Integer>(numberOfCalls - 1));
+
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
-			spout);
+				spout);
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-		
+
 		spoutWrapper.cancel();
 		final TestCollector collector = new TestCollector();
 		spoutWrapper.run(collector);
-		
+
 		Assert.assertEquals(expectedResult, collector.result);
 	}
-	
+
 	@Test
 	public void testClose() throws Exception {
 		final IRichSpout spout = mock(IRichSpout.class);
 		final StormFiniteSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormFiniteSpoutWrapper<Tuple1<Integer>>(
-			spout);
-		
+				spout);
+
 		spoutWrapper.close();
-		
+
 		verify(spout).close();
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
index 5a4c05b..a72eb19 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormOutputFieldsDeclarerTest.java
@@ -17,66 +17,64 @@
 
 package org.apache.flink.stormcompatibility.wrappers;
 
-import java.util.ArrayList;
-
+import backtype.storm.tuple.Fields;
+import backtype.storm.utils.Utils;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.junit.Assert;
 import org.junit.Test;
 
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-
-
-
+import java.util.ArrayList;
 
 public class StormOutputFieldsDeclarerTest extends AbstractTest {
-	
+
 	@Test
 	public void testDeclare() {
 		final StormOutputFieldsDeclarer declarer = new StormOutputFieldsDeclarer();
-		
+
 		Assert.assertEquals(-1, declarer.getNumberOfAttributes());
-		
+
 		final int numberOfAttributes = 1 + this.r.nextInt(25);
 		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
-		for(int i = 0; i < numberOfAttributes; ++i) {
+		for (int i = 0; i < numberOfAttributes; ++i) {
 			schema.add("a" + i);
 		}
 		declarer.declare(new Fields(schema));
 		Assert.assertEquals(numberOfAttributes, declarer.getNumberOfAttributes());
 	}
-	
+
+	@SuppressWarnings("unused")
 	public void testDeclareDirect() {
 		new StormOutputFieldsDeclarer().declare(false, null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareDirectFail() {
 		new StormOutputFieldsDeclarer().declare(true, null);
 	}
-	
+
+	@SuppressWarnings("unused")
 	public void testDeclareStream() {
 		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareStreamFail() {
 		new StormOutputFieldsDeclarer().declareStream(null, null);
 	}
-	
+
+	@SuppressWarnings("unused")
 	public void testDeclareFullStream() {
 		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareFullStreamFailNonDefaultStream() {
 		new StormOutputFieldsDeclarer().declareStream(null, false, null);
 	}
-	
+
 	@Test(expected = UnsupportedOperationException.class)
 	public void testDeclareFullStreamFailDirect() {
 		new StormOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
 	}
-	
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e497a831/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
index 88d855c..48f680a 100644
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
+++ b/flink-staging/flink-streaming/flink-storm-compatibility/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapperTest.java
@@ -14,13 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 
-import java.util.LinkedList;
+package org.apache.flink.stormcompatibility.wrappers;
 
+import backtype.storm.topology.IRichSpout;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.stormcompatibility.util.AbstractTest;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -30,39 +27,38 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
-import backtype.storm.topology.IRichSpout;
-
-
-
+import java.util.LinkedList;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(StormWrapperSetupHelper.class)
 public class StormSpoutWrapperTest extends AbstractTest {
-	
+
 	@Test
 	public void testRunExecuteCancelInfinite() throws Exception {
 		final int numberOfCalls = 5 + this.r.nextInt(5);
-		
+
 		final IRichSpout spout = new FiniteTestSpout(numberOfCalls);
 		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
 		spoutWrapper.setRuntimeContext(mock(StreamingRuntimeContext.class));
-		
+
 		spoutWrapper.cancel();
 		final TestCollector collector = new TestCollector();
 		spoutWrapper.run(collector);
-		
+
 		Assert.assertEquals(new LinkedList<Tuple1<Integer>>(), collector.result);
 	}
-	
+
 	@Test
 	public void testClose() throws Exception {
 		final IRichSpout spout = mock(IRichSpout.class);
 		final StormSpoutWrapper<Tuple1<Integer>> spoutWrapper = new StormSpoutWrapper<Tuple1<Integer>>(spout);
-		
+
 		spoutWrapper.close();
-		
+
 		verify(spout).close();
 	}
-	
+
 }