You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:36 UTC

[11/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
deleted file mode 100644
index 300b241..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutWrapper.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link StormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped spout's
- * {@link IRichSpout#nextTuple() nextTuple()} method in in infinite loop.
- */
-public class StormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
-	private static final long serialVersionUID = -218340336648247605L;
-
-	/**
-	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
-	 * be used within a Flink streaming program. The output type will be one of {@link Tuple0} to {@link Tuple25}
-	 * depending on the spout's declared number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [0;25].
-	 */
-	public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
-		super(spout, null);
-	}
-
-	/**
-	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
-	 * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
-	 * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
-	 * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
-	 * attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type. (Can be {@code null}.)
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormSpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
-			throws IllegalArgumentException {
-		super(spout, Sets.newHashSet(rawOutputs));
-	}
-
-	/**
-	 * Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it can
-	 * be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
-	 * {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
-	 * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
-	 * attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type. (Can be {@code null}.)
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs)
-			throws IllegalArgumentException {
-		super(spout, rawOutputs);
-	}
-
-	/**
-	 * Calls {@link IRichSpout#nextTuple()} in an infinite loop until {@link #cancel()} is called.
-	 */
-	@Override
-	protected void execute() {
-		while (super.isRunning) {
-			super.spout.nextTuple();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
deleted file mode 100644
index 07d94b4..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormTuple.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-/*
- * We do neither import
- * 		backtype.storm.tuple.Tuple;
- * nor
- * 		org.apache.flink.api.java.tuple.Tuple
- * to avoid confusion
- */
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Values;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.List;
-
-/**
- * {@link StormTuple} converts a Flink tuple of type {@code IN} into a Storm tuple.
- */
-class StormTuple<IN> implements backtype.storm.tuple.Tuple {
-
-	/** The Storm representation of the original Flink tuple */
-	private final Values stormTuple;
-	/** The schema (ie, ordered field names) of the tuple */
-	private final Fields schema;
-
-	/**
-	 * Create a new Storm tuple from the given Flink tuple. The provided {@code nameIndexMap} is ignored for raw input
-	 * types.
-	 * 
-	 * @param flinkTuple
-	 * 		The Flink tuple to be converted.
-	 * @param schema
-	 * 		The schema (ie, ordered field names) of the tuple.
-	 */
-	public StormTuple(final IN flinkTuple, final Fields schema) {
-		if (flinkTuple instanceof org.apache.flink.api.java.tuple.Tuple) {
-			this.schema = schema;
-			final org.apache.flink.api.java.tuple.Tuple t = (org.apache.flink.api.java.tuple.Tuple) flinkTuple;
-
-			final int numberOfAttributes = t.getArity();
-			this.stormTuple = new Values();
-			for (int i = 0; i < numberOfAttributes; ++i) {
-				this.stormTuple.add(t.getField(i));
-			}
-		} else {
-			this.schema = null;
-			this.stormTuple = new Values(flinkTuple);
-		}
-	}
-
-	@Override
-	public int size() {
-		return this.stormTuple.size();
-	}
-
-	@Override
-	public boolean contains(final String field) {
-		if (this.schema != null) {
-			return this.schema.contains(field);
-		}
-
-		try {
-			this.getPublicMemberField(field);
-			return true;
-		} catch (NoSuchFieldException f) {
-			try {
-				this.getGetterMethod(field);
-				return true;
-			} catch (Exception g) {
-				return false;
-			}
-		} catch (Exception e) {
-			return false;
-		}
-	}
-
-	@Override
-	public Fields getFields() {
-		return this.schema;
-	}
-
-	@Override
-	public int fieldIndex(final String field) {
-		return this.schema.fieldIndex(field);
-	}
-
-	@Override
-	public List<Object> select(final Fields selector) {
-		return this.schema.select(selector, this.stormTuple);
-	}
-
-	@Override
-	public Object getValue(final int i) {
-		return this.stormTuple.get(i);
-	}
-
-	@Override
-	public String getString(final int i) {
-		return (String) this.stormTuple.get(i);
-	}
-
-	@Override
-	public Integer getInteger(final int i) {
-		return (Integer) this.stormTuple.get(i);
-	}
-
-	@Override
-	public Long getLong(final int i) {
-		return (Long) this.stormTuple.get(i);
-	}
-
-	@Override
-	public Boolean getBoolean(final int i) {
-		return (Boolean) this.stormTuple.get(i);
-	}
-
-	@Override
-	public Short getShort(final int i) {
-		return (Short) this.stormTuple.get(i);
-	}
-
-	@Override
-	public Byte getByte(final int i) {
-		return (Byte) this.stormTuple.get(i);
-	}
-
-	@Override
-	public Double getDouble(final int i) {
-		return (Double) this.stormTuple.get(i);
-	}
-
-	@Override
-	public Float getFloat(final int i) {
-		return (Float) this.stormTuple.get(i);
-	}
-
-	@Override
-	public byte[] getBinary(final int i) {
-		return (byte[]) this.stormTuple.get(i);
-	}
-
-	private Field getPublicMemberField(final String field) throws Exception {
-		assert (this.stormTuple.size() == 1);
-		return this.stormTuple.get(0).getClass().getField(field);
-	}
-
-	private Method getGetterMethod(final String field) throws Exception {
-		assert (this.stormTuple.size() == 1);
-		return this.stormTuple
-				.get(0)
-				.getClass()
-				.getMethod("get" + Character.toUpperCase(field.charAt(0)) + field.substring(1),
-						(Class<?>[]) null);
-	}
-
-	private Object getValueByPublicMember(final String field) throws Exception {
-		assert (this.stormTuple.size() == 1);
-		return getPublicMemberField(field).get(this.stormTuple.get(0));
-	}
-
-	private Object getValueByGetter(final String field) throws Exception {
-		assert (this.stormTuple.size() == 1);
-		return getGetterMethod(field).invoke(this.stormTuple.get(0), (Object[]) null);
-	}
-
-	@SuppressWarnings("unchecked")
-	public <T> T getValueByName(final String field) {
-		if (this.schema != null) {
-			return (T) this.getValue(this.schema.fieldIndex(field));
-		}
-		assert (this.stormTuple.size() == 1);
-
-		Exception e;
-		try {
-			// try public member
-			return (T) getValueByPublicMember(field);
-		} catch (NoSuchFieldException f) {
-			try {
-				// try getter-method
-				return (T) getValueByGetter(field);
-			} catch (Exception g) {
-				e = g;
-			}
-		} catch (Exception f) {
-			e = f;
-		}
-
-		throw new RuntimeException("Could not access field <" + field + ">", e);
-	}
-
-	@Override
-	public Object getValueByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public String getStringByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public Integer getIntegerByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public Long getLongByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public Boolean getBooleanByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public Short getShortByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public Byte getByteByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public Double getDoubleByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public Float getFloatByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public byte[] getBinaryByField(final String field) {
-		return getValueByName(field);
-	}
-
-	@Override
-	public List<Object> getValues() {
-		return this.stormTuple;
-	}
-
-	@Override
-	public GlobalStreamId getSourceGlobalStreamid() {
-		// not sure if Flink can support this
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public String getSourceComponent() {
-		// not sure if Flink can support this
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public int getSourceTask() {
-		// not sure if Flink can support this
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public String getSourceStreamId() {
-		// not sure if Flink can support this
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public MessageId getMessageId() {
-		// not sure if Flink can support this
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = (prime * result) + ((this.stormTuple == null) ? 0 : this.stormTuple.hashCode());
-		return result;
-	}
-
-	@Override
-	public boolean equals(final Object obj) {
-		if (this == obj) {
-			return true;
-		}
-		if (obj == null) {
-			return false;
-		}
-		if (this.getClass() != obj.getClass()) {
-			return false;
-		}
-		final StormTuple<?> other = (StormTuple<?>) obj;
-		if (this.stormTuple == null) {
-			if (other.stormTuple != null) {
-				return false;
-			}
-		} else if (!this.stormTuple.equals(other.stormTuple)) {
-			return false;
-		}
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
deleted file mode 100644
index 891497e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormWrapperSetupHelper.java
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
-import org.apache.flink.stormcompatibility.util.FlinkTopologyContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-import clojure.lang.Atom;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * {@link StormWrapperSetupHelper} is an helper class used by {@link AbstractStormSpoutWrapper} and
- * {@link StormBoltWrapper}.
- */
-class StormWrapperSetupHelper {
-
-	/** The configuration key for the topology name. */
-	final static String TOPOLOGY_NAME = "storm.topology.name";
-
-	/**
-	 * Computes the number of output attributes used by a {@link AbstractStormSpoutWrapper} or {@link StormBoltWrapper}
-	 * per declared output stream. The number is {@code -1} for raw output type or a value within range [0;25] for
-	 * output type {@link org.apache.flink.api.java.tuple.Tuple0 Tuple0} to
-	 * {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25}.
-	 * 
-	 * @param spoutOrBolt
-	 *            The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type
-	 *            {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} but be of a raw type. (Can be {@code null}.)
-	 * @return The number of attributes to be used for each stream.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public static HashMap<String, Integer> getNumberOfAttributes(final IComponent spoutOrBolt,
-			final Collection<String> rawOutputs)
-					throws IllegalArgumentException {
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-		spoutOrBolt.declareOutputFields(declarer);
-
-		for (Entry<String, Integer> schema : declarer.outputSchemas.entrySet()) {
-			int declaredNumberOfAttributes = schema.getValue();
-			if ((declaredNumberOfAttributes < 0) || (declaredNumberOfAttributes > 25)) {
-				throw new IllegalArgumentException(
-						"Provided bolt declares non supported number of output attributes. Must be in range [0;25] but "
-								+ "was " + declaredNumberOfAttributes);
-			}
-
-			if (rawOutputs != null && rawOutputs.contains(schema.getKey())) {
-				if (declaredNumberOfAttributes != 1) {
-					throw new IllegalArgumentException(
-							"Ouput type is requested to be raw type, but provided bolt declares more then one output "
-									+ "attribute.");
-				}
-				schema.setValue(-1);
-			}
-		}
-
-		return declarer.outputSchemas;
-	}
-
-	/** Used to computed unique task IDs for a Storm topology. */
-	private static int tid;
-
-	/**
-	 * Creates a {@link TopologyContext} for a Spout or Bolt instance (ie, Flink task / Storm executor).
-	 * 
-	 * @param context
-	 *            The Flink runtime context.
-	 * @param spoutOrBolt
-	 *            The Spout or Bolt this context is created for.
-	 * @param stormTopology
-	 *            The original Storm topology.
-	 * @param stormConfig
-	 *            The user provided configuration.
-	 * @return The created {@link TopologyContext}.
-	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public static synchronized TopologyContext createTopologyContext(
-			final StreamingRuntimeContext context, final IComponent spoutOrBolt,
-			StormTopology stormTopology, Map stormConfig) {
-		String operatorName = context.getTaskName();
-		if (operatorName.startsWith("Source: ")) {
-			// prefix "Source: " is inserted by Flink sources by default -- need to get rid of it here
-			operatorName = operatorName.substring(8);
-		}
-		final int dop = context.getNumberOfParallelSubtasks();
-
-		final Map<Integer, String> taskToComponents = new HashMap<Integer, String>();
-		final Map<String, List<Integer>> componentToSortedTasks = new HashMap<String, List<Integer>>();
-		final Map<String, Map<String, Fields>> componentToStreamToFields = new HashMap<String, Map<String, Fields>>();
-		String stormId = (String) stormConfig.get(TOPOLOGY_NAME);
-		String codeDir = null; // not supported
-		String pidDir = null; // not supported
-		Integer taskId = null;
-		Integer workerPort = null; // not supported
-		List<Integer> workerTasks = new ArrayList<Integer>();
-		final Map<String, Object> defaultResources = new HashMap<String, Object>();
-		final Map<String, Object> userResources = new HashMap<String, Object>();
-		final Map<String, Object> executorData = new HashMap<String, Object>();
-		final Map registeredMetrics = new HashMap();
-		Atom openOrPrepareWasCalled = null;
-
-		if (stormTopology == null) {
-			// embedded mode
-			ComponentCommon common = new ComponentCommon();
-			common.set_parallelism_hint(dop);
-
-			HashMap<String, SpoutSpec> spouts = new HashMap<String, SpoutSpec>();
-			HashMap<String, Bolt> bolts = new HashMap<String, Bolt>();
-			if (spoutOrBolt instanceof IRichSpout) {
-				spouts.put(operatorName, new SpoutSpec(null, common));
-			} else {
-				assert (spoutOrBolt instanceof IRichBolt);
-				bolts.put(operatorName, new Bolt(null, common));
-			}
-			stormTopology = new StormTopology(spouts, bolts, new HashMap<String, StateSpoutSpec>());
-
-			taskId = context.getIndexOfThisSubtask();
-
-			List<Integer> sortedTasks = new ArrayList<Integer>(dop);
-			for (int i = 1; i <= dop; ++i) {
-				taskToComponents.put(i, operatorName);
-				sortedTasks.add(i);
-			}
-			componentToSortedTasks.put(operatorName, sortedTasks);
-
-			FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-			spoutOrBolt.declareOutputFields(declarer);
-			componentToStreamToFields.put(operatorName, declarer.outputStreams);
-		} else {
-			// whole topology is built (ie, FlinkTopologyBuilder is used)
-			Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
-			Map<String, Bolt> bolts = stormTopology.get_bolts();
-			Map<String, StateSpoutSpec> stateSpouts = stormTopology.get_state_spouts();
-
-			tid = 1;
-
-			for (Entry<String, SpoutSpec> spout : spouts.entrySet()) {
-				Integer rc = processSingleOperator(spout.getKey(), spout.getValue().get_common(),
-						operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents,
-						componentToSortedTasks, componentToStreamToFields);
-				if (rc != null) {
-					taskId = rc;
-				}
-			}
-			for (Entry<String, Bolt> bolt : bolts.entrySet()) {
-				Integer rc = processSingleOperator(bolt.getKey(), bolt.getValue().get_common(),
-						operatorName, context.getIndexOfThisSubtask(), dop, taskToComponents,
-						componentToSortedTasks, componentToStreamToFields);
-				if (rc != null) {
-					taskId = rc;
-				}
-			}
-			for (Entry<String, StateSpoutSpec> stateSpout : stateSpouts.entrySet()) {
-				Integer rc = taskId = processSingleOperator(stateSpout.getKey(), stateSpout
-						.getValue().get_common(), operatorName, context.getIndexOfThisSubtask(),
-						dop, taskToComponents, componentToSortedTasks, componentToStreamToFields);
-				if (rc != null) {
-					taskId = rc;
-				}
-			}
-			assert (taskId != null);
-		}
-
-		if (!stormConfig.containsKey(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)) {
-			stormConfig.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30); // Storm default value
-		}
-
-		return new FlinkTopologyContext(stormTopology, stormConfig, taskToComponents,
-				componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir,
-				taskId, workerPort, workerTasks, defaultResources, userResources, executorData,
-				registeredMetrics, openOrPrepareWasCalled);
-	}
-
-	/**
-	 * Sets up {@code taskToComponents}, {@code componentToSortedTasks}, and {@code componentToStreamToFields} for a
-	 * single instance of a Spout or Bolt (ie, task or executor). Furthermore, is computes the unique task-id.
-	 * 
-	 * @param componentId
-	 *            The ID of the Spout/Bolt in the topology.
-	 * @param common
-	 *            The common operator object (that is all Spouts and Bolts have).
-	 * @param operatorName
-	 *            The Flink operator name.
-	 * @param index
-	 *            The index of the currently processed tasks with its operator.
-	 * @param dop
-	 *            The parallelism of the operator.
-	 * @param taskToComponents
-	 *            OUTPUT: A map from all task IDs of the topology to their component IDs.
-	 * @param componentToSortedTasks
-	 *            OUTPUT: A map from all component IDs to their sorted list of corresponding task IDs.
-	 * @param componentToStreamToFields
-	 *            OUTPUT: A map from all component IDs to there output streams and output fields.
-	 * 
-	 * @return A unique task ID if the currently processed Spout or Bolt ({@code componentId}) is equal to the current
-	 *         Flink operator ({@link operatorName}) -- {@code null} otherwise.
-	 */
-	private static Integer processSingleOperator(final String componentId,
-			final ComponentCommon common, final String operatorName, final int index,
-			final int dop, final Map<Integer, String> taskToComponents,
-			final Map<String, List<Integer>> componentToSortedTasks,
-			final Map<String, Map<String, Fields>> componentToStreamToFields) {
-		final int parallelism_hint = common.get_parallelism_hint();
-		Integer taskId = null;
-
-		if (componentId.equals(operatorName)) {
-			taskId = tid + index;
-		}
-
-		List<Integer> sortedTasks = new ArrayList<Integer>(dop);
-		for (int i = 0; i < parallelism_hint; ++i) {
-			taskToComponents.put(tid, componentId);
-			sortedTasks.add(tid);
-			++tid;
-		}
-		componentToSortedTasks.put(componentId, sortedTasks);
-
-		if (componentId.equals(operatorName)) {
-		}
-
-		Map<String, Fields> outputStreams = new HashMap<String, Fields>();
-		for(Entry<String, StreamInfo> outStream : common.get_streams().entrySet()) {
-			outputStreams.put(outStream.getKey(), new Fields(outStream.getValue().get_output_fields()));
-		}
-		componentToStreamToFields.put(componentId, outputStreams);
-
-		return taskId;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
deleted file mode 100644
index 0dd9b1c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyBuilderTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import org.apache.flink.stormcompatibility.util.TestDummyBolt;
-import org.apache.flink.stormcompatibility.util.TestDummySpout;
-import org.apache.flink.stormcompatibility.util.TestSink;
-import org.junit.Test;
-
-import backtype.storm.tuple.Fields;
-
-public class FlinkTopologyBuilderTest {
-
-	@Test(expected = RuntimeException.class)
-	public void testUnknowSpout() {
-		FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-		builder.setSpout("spout", new TestSpout());
-		builder.setBolt("bolt", new TestBolt()).shuffleGrouping("unknown");
-		builder.createTopology();
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void testUnknowBolt() {
-		FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-		builder.setSpout("spout", new TestSpout());
-		builder.setBolt("bolt1", new TestBolt()).shuffleGrouping("spout");
-		builder.setBolt("bolt2", new TestBolt()).shuffleGrouping("unknown");
-		builder.createTopology();
-	}
-
-	@Test(expected = RuntimeException.class)
-	public void testUndeclaredStream() {
-		FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
-		builder.setSpout("spout", new TestSpout());
-		builder.setBolt("bolt", new TestBolt()).shuffleGrouping("spout");
-		builder.createTopology();
-	}
-
-	@Test
-	public void testFieldsGroupingOnMultipleSpoutOutputStreams() {
-		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
-
-		flinkBuilder.setSpout("spout", new TestDummySpout());
-		flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("spout",
-				TestDummySpout.spoutStreamId, new Fields("id"));
-
-		flinkBuilder.createTopology();
-	}
-
-	@Test
-	public void testFieldsGroupingOnMultipleBoltOutputStreams() {
-		FlinkTopologyBuilder flinkBuilder = new FlinkTopologyBuilder();
-
-		flinkBuilder.setSpout("spout", new TestDummySpout());
-		flinkBuilder.setBolt("bolt", new TestDummyBolt()).shuffleGrouping("spout");
-		flinkBuilder.setBolt("sink", new TestSink()).fieldsGrouping("bolt",
-				TestDummyBolt.groupingStreamId, new Fields("id"));
-
-		flinkBuilder.createTopology();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
deleted file mode 100644
index c98c9a3..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTopologyTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class FlinkTopologyTest {
-
-	@Test
-	public void testDefaultParallelism() {
-		final FlinkTopology topology = new FlinkTopology();
-		Assert.assertEquals(1, topology.getParallelism());
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testExecute() throws Exception {
-		new FlinkTopology().execute();
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testExecuteWithName() throws Exception {
-		new FlinkTopology().execute(null);
-	}
-
-	@Test
-	public void testNumberOfTasks() {
-		final FlinkTopology topology = new FlinkTopology();
-
-		Assert.assertEquals(0, topology.getNumberOfTasks());
-
-		topology.increaseNumberOfTasks(3);
-		Assert.assertEquals(3, topology.getNumberOfTasks());
-
-		topology.increaseNumberOfTasks(2);
-		Assert.assertEquals(5, topology.getNumberOfTasks());
-
-		topology.increaseNumberOfTasks(8);
-		Assert.assertEquals(13, topology.getNumberOfTasks());
-	}
-
-	@Test(expected = AssertionError.class)
-	public void testAssert() {
-		new FlinkTopology().increaseNumberOfTasks(0);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
deleted file mode 100644
index 2e4a534..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestBolt.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-public class TestBolt implements IRichBolt {
-	private static final long serialVersionUID = -667148827441397683L;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {}
-
-	@Override
-	public void execute(Tuple input) {}
-
-	@Override
-	public void cleanup() {}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
deleted file mode 100644
index 146218f..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestSpout.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import java.util.Map;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-
-public class TestSpout implements IRichSpout {
-	private static final long serialVersionUID = -4884029383198924007L;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
-
-	@Override
-	public void close() {}
-
-	@Override
-	public void activate() {}
-
-	@Override
-	public void deactivate() {}
-
-	@Override
-	public void nextTuple() {}
-
-	@Override
-	public void ack(Object msgId) {}
-
-	@Override
-	public void fail(Object msgId) {}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
deleted file mode 100644
index f664e58..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/api/TestTopologyBuilder.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.StormTopology;
-
-public class TestTopologyBuilder extends FlinkTopologyBuilder {
-	@Override
-	public StormTopology getStormTopology() {
-		return super.getStormTopology();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
deleted file mode 100644
index 94a50cf..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/AbstractTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import org.junit.Before;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Random;
-
-public abstract class AbstractTest {
-	private static final Logger LOG = LoggerFactory.getLogger(AbstractTest.class);
-
-	protected long seed;
-	protected Random r;
-
-	@Before
-	public void prepare() {
-		this.seed = System.currentTimeMillis();
-		this.r = new Random(this.seed);
-		LOG.info("Test seed: {}", new Long(this.seed));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
deleted file mode 100644
index 1891873..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FiniteTestSpout.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-
-import java.util.Map;
-
-public class FiniteTestSpout implements IRichSpout {
-	private static final long serialVersionUID = 7992419478267824279L;
-
-	private int numberOfOutputTuples;
-	private SpoutOutputCollector collector;
-
-	public FiniteTestSpout(final int numberOfOutputTuples) {
-		this.numberOfOutputTuples = numberOfOutputTuples;
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-		this.collector = collector;
-	}
-
-	@Override
-	public void close() {/* nothing to do */}
-
-	@Override
-	public void activate() {/* nothing to do */}
-
-	@Override
-	public void deactivate() {/* nothing to do */}
-
-	@Override
-	public void nextTuple() {
-		if (--this.numberOfOutputTuples >= 0) {
-			this.collector.emit(new Values(new Integer(this.numberOfOutputTuples)));
-		}
-	}
-
-	@Override
-	public void ack(final Object msgId) {/* nothing to do */}
-
-	@Override
-	public void fail(final Object msgId) {/* nothing to do */}
-
-	@Override
-	public void declareOutputFields(final OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("dummy"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
deleted file mode 100644
index 8e63563..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.stormcompatibility.util.FlinkOutputFieldsDeclarer;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.LinkedList;
-
-public class FlinkOutputFieldsDeclarerTest extends AbstractTest {
-
-
-
-	@Test
-	public void testNull() {
-		Assert.assertNull(new FlinkOutputFieldsDeclarer().getOutputType(null));
-	}
-
-	@Test
-	public void testDeclare() {
-		for (int i = 0; i < 2; ++i) { // test case: simple / non-direct
-			for (int j = 1; j < 2; ++j) { // number of streams
-				for (int k = 0; k <= 25; ++k) { // number of attributes
-					this.runDeclareTest(i, j, k);
-				}
-			}
-		}
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareSimpleToManyAttributes() {
-		this.runDeclareTest(0, this.r.nextBoolean() ? 1 : 2, 26);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareNonDirectToManyAttributes() {
-		this.runDeclareTest(1, this.r.nextBoolean() ? 1 : 2, 26);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareDefaultStreamToManyAttributes() {
-		this.runDeclareTest(2, this.r.nextBoolean() ? 1 : 2, 26);
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareFullToManyAttributes() {
-		this.runDeclareTest(3, this.r.nextBoolean() ? 1 : 2, 26);
-	}
-
-	private void runDeclareTest(final int testCase, final int numberOfStreams,
-			final int numberOfAttributes) {
-		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-
-		String[] streams = null;
-		if (numberOfStreams > 1 || r.nextBoolean()) {
-			streams = new String[numberOfStreams];
-			for (int i = 0; i < numberOfStreams; ++i) {
-				streams[i] = "stream" + i;
-			}
-		}
-
-		final String[] attributes = new String[numberOfAttributes];
-		for (int i = 0; i < attributes.length; ++i) {
-			attributes[i] = "a" + i;
-		}
-
-		switch (testCase) {
-		case 0:
-			this.declareSimple(declarer, streams, attributes);
-			break;
-		default:
-			this.declareNonDirect(declarer, streams, attributes);
-		}
-
-		if (streams == null) {
-			streams = new String[] { Utils.DEFAULT_STREAM_ID };
-		}
-
-		for (String stream : streams) {
-			final TypeInformation<?> type = declarer.getOutputType(stream);
-
-			if (numberOfAttributes == 1) {
-				Assert.assertEquals(type.getClass(), GenericTypeInfo.class);
-				Assert.assertEquals(type.getTypeClass(), Object.class);
-			} else {
-				Assert.assertEquals(numberOfAttributes, type.getArity());
-				Assert.assertTrue(type.isTupleType());
-			}
-		}
-	}
-
-	private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
-			final String[] attributes) {
-
-		if (streams != null) {
-			for (String stream : streams) {
-				declarer.declareStream(stream, new Fields(attributes));
-			}
-		} else {
-			declarer.declare(new Fields(attributes));
-		}
-	}
-
-	private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] streams,
-			final String[] attributes) {
-
-		if (streams != null) {
-			for (String stream : streams) {
-				declarer.declareStream(stream, false, new Fields(attributes));
-			}
-		} else {
-			declarer.declare(false, new Fields(attributes));
-		}
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testUndeclared() {
-		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-		declarer.getOutputType("unknownStreamId");
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareDirect() {
-		new FlinkOutputFieldsDeclarer().declare(true, null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareDirect2() {
-		new FlinkOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, null);
-	}
-
-	@Test
-	public void testGetGroupingFieldIndexes() {
-		final int numberOfAttributes = 5 + this.r.nextInt(21);
-		final String[] attributes = new String[numberOfAttributes];
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			attributes[i] = "a" + i;
-		}
-
-		final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
-		declarer.declare(new Fields(attributes));
-
-		final int numberOfKeys = 1 + this.r.nextInt(25);
-		final LinkedList<String> groupingFields = new LinkedList<String>();
-		final boolean[] indexes = new boolean[numberOfAttributes];
-
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			if (this.r.nextInt(26) < numberOfKeys) {
-				groupingFields.add(attributes[i]);
-				indexes[i] = true;
-			} else {
-				indexes[i] = false;
-			}
-		}
-
-		final int[] expectedResult = new int[groupingFields.size()];
-		int j = 0;
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			if (indexes[i]) {
-				expectedResult[j++] = i;
-			}
-		}
-
-		final int[] result = declarer.getGroupingFieldIndexes(Utils.DEFAULT_STREAM_ID,
-				groupingFields);
-
-		Assert.assertEquals(expectedResult.length, result.length);
-		for (int i = 0; i < expectedResult.length; ++i) {
-			Assert.assertEquals(expectedResult[i], result[i]);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
deleted file mode 100644
index c3cb7d7..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelectorTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.Iterator;
-
-import org.apache.flink.stormcompatibility.util.FlinkStormStreamSelector;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class FlinkStormStreamSelectorTest {
-
-	@Test
-	public void testSelector() {
-		FlinkStormStreamSelector<Object> selector = new FlinkStormStreamSelector<Object>();
-		SplitStreamType<Object> tuple = new SplitStreamType<Object>();
-		Iterator<String> result;
-
-		tuple.streamId = "stream1";
-		result = selector.select(tuple).iterator();
-		Assert.assertEquals("stream1", result.next());
-		Assert.assertFalse(result.hasNext());
-
-		tuple.streamId = "stream2";
-		result = selector.select(tuple).iterator();
-		Assert.assertEquals("stream2", result.next());
-		Assert.assertFalse(result.hasNext());
-
-		tuple.streamId = "stream1";
-		result = selector.select(tuple).iterator();
-		Assert.assertEquals("stream1", result.next());
-		Assert.assertFalse(result.hasNext());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
deleted file mode 100644
index bd9ea3f..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContextTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.HashMap;
-
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-
-import org.apache.flink.stormcompatibility.util.FlinkTopologyContext;
-import org.junit.Test;
-
-
-/*
- * FlinkTopologyContext.getSources(componentId) and FlinkTopologyContext.getTargets(componentId) are not tested here,
- * because those are tested in StormWrapperSetupHelperTest.
- */
-public class FlinkTopologyContextTest extends AbstractTest {
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testAddTaskHook() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.addTaskHook(null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetHooks() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.getHooks();
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testRegisteredMetric1() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.registerMetric(null, (ICombiner) null, 0);
-	}
-
-	@SuppressWarnings("rawtypes")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testRegisteredMetric2() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.registerMetric(null, (IReducer) null, 0);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testRegisteredMetric3() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.registerMetric(null, (IMetric) null, 0);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testGetRegisteredMetricByName() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.getRegisteredMetricByName(null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testSetAllSubscribedState() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.setAllSubscribedState(null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testSetSubscribedState1() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.setSubscribedState(null, null);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testSetSubscribedState2() {
-		new FlinkTopologyContext(new StormTopology(new HashMap<String, SpoutSpec>(),
-				new HashMap<String, Bolt>(), new HashMap<String, StateSpoutSpec>()), null, null,
-				null, null, null, null, null, null, null, null, null, null, null, null, null)
-		.setSubscribedState(null, null, null);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
deleted file mode 100644
index b499373..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummyBolt.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class TestDummyBolt implements IRichBolt {
-	private static final long serialVersionUID = 6893611247443121322L;
-
-	public final static String shuffleStreamId = "shuffleStream";
-	public final static String groupingStreamId = "groupingStream";
-
-	private boolean emit = true;
-	private TopologyContext context;
-	private OutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		this.context = context;
-		this.collector = collector;
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		if (this.context.getThisTaskIndex() == 0) {
-			this.collector.emit(shuffleStreamId, input.getValues());
-		}
-		if (this.emit) {
-			this.collector.emit(groupingStreamId, new Values("bolt", this.context));
-			this.emit = false;
-		}
-	}
-
-	@Override
-	public void cleanup() {}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declareStream(shuffleStreamId, new Fields("data"));
-		declarer.declareStream(groupingStreamId, new Fields("id", "data"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
deleted file mode 100644
index 345ca12..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestDummySpout.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.Map;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-public class TestDummySpout implements IRichSpout {
-	private static final long serialVersionUID = -5190945609124603118L;
-
-	public final static String spoutStreamId = "spout-stream";
-
-	private boolean emit = true;
-	private TopologyContext context;
-	private SpoutOutputCollector collector;
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-		this.context = context;
-		this.collector = collector;
-	}
-
-	@Override
-	public void close() {}
-
-	@Override
-	public void activate() {}
-
-	@Override
-	public void deactivate() {}
-
-	@Override
-	public void nextTuple() {
-		if (this.emit) {
-			this.collector.emit(new Values(this.context));
-			this.emit = false;
-		}
-	}
-
-	@Override
-	public void ack(Object msgId) {}
-
-	@Override
-	public void fail(Object msgId) {}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields("data"));
-		declarer.declareStream(spoutStreamId, new Fields("id", "data"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
deleted file mode 100644
index c8e5584..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/util/TestSink.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-
-public class TestSink implements IRichBolt {
-	private static final long serialVersionUID = 4314871456719370877L;
-
-	public final static List<TopologyContext> result = new LinkedList<TopologyContext>();
-
-	@SuppressWarnings("rawtypes")
-	@Override
-	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-		result.add(context);
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		if (input.size() == 1) {
-			result.add((TopologyContext) input.getValue(0));
-		} else {
-			result.add((TopologyContext) input.getValue(1));
-		}
-	}
-
-	@Override
-	public void cleanup() {}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
deleted file mode 100644
index b44e8a1..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapperTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(StormWrapperSetupHelper.class)
-public class FiniteStormSpoutWrapperTest {
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void runAndExecuteTest1() throws Exception {
-		final FiniteStormSpout stormSpout = mock(FiniteStormSpout.class);
-		when(stormSpout.reachedEnd()).thenReturn(false, false, false, true, false, false, true);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
-		wrapper.setRuntimeContext(taskContext);
-
-		wrapper.run(mock(SourceContext.class));
-		verify(stormSpout, times(3)).nextTuple();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void runAndExecuteTest2() throws Exception {
-		final FiniteStormSpout stormSpout = mock(FiniteStormSpout.class);
-		when(stormSpout.reachedEnd()).thenReturn(true, false, true, false, true, false, true);
-
-		final StreamingRuntimeContext taskContext = mock(StreamingRuntimeContext.class);
-		when(taskContext.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(taskContext.getTaskStubParameters()).thenReturn(new Configuration());
-		when(taskContext.getTaskName()).thenReturn("name");
-
-		final FiniteStormSpoutWrapper<?> wrapper = new FiniteStormSpoutWrapper<Object>(stormSpout);
-		wrapper.setRuntimeContext(taskContext);
-
-		wrapper.run(mock(SourceContext.class));
-		verify(stormSpout, never()).nextTuple();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
deleted file mode 100644
index 738eb1e..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarerTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-
-public class SetupOutputFieldsDeclarerTest extends AbstractTest {
-
-	@Test
-	public void testDeclare() {
-		final SetupOutputFieldsDeclarer declarer = new SetupOutputFieldsDeclarer();
-
-		int numberOfAttributes = this.r.nextInt(26);
-		declarer.declare(createSchema(numberOfAttributes));
-		Assert.assertEquals(1, declarer.outputSchemas.size());
-		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(Utils.DEFAULT_STREAM_ID)
-				.intValue());
-
-		final String sid = "streamId";
-		numberOfAttributes = this.r.nextInt(26);
-		declarer.declareStream(sid, createSchema(numberOfAttributes));
-		Assert.assertEquals(2, declarer.outputSchemas.size());
-		Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
-	}
-
-	private Fields createSchema(final int numberOfAttributes) {
-		final ArrayList<String> schema = new ArrayList<String>(numberOfAttributes);
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			schema.add("a" + i);
-		}
-		return new Fields(schema);
-	}
-
-	@Test
-	public void testDeclareDirect() {
-		new SetupOutputFieldsDeclarer().declare(false, new Fields());
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareDirectFail() {
-		new SetupOutputFieldsDeclarer().declare(true, new Fields());
-	}
-
-	@Test
-	public void testDeclareStream() {
-		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, new Fields());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareStreamFail() {
-		new SetupOutputFieldsDeclarer().declareStream(null, new Fields());
-	}
-
-	@Test
-	public void testDeclareFullStream() {
-		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testDeclareFullStreamFailNonDefaultStream() {
-		new SetupOutputFieldsDeclarer().declareStream(null, false, new Fields());
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testDeclareFullStreamFailDirect() {
-		new SetupOutputFieldsDeclarer().declareStream(Utils.DEFAULT_STREAM_ID, true, new Fields());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
deleted file mode 100644
index d01c3e0..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/test/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollectorTest.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.tuple.Values;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.AbstractTest;
-import org.apache.flink.streaming.api.operators.Output;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-public class StormBoltCollectorTest extends AbstractTest {
-
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Test
-	public void testBoltStormCollector() throws InstantiationException, IllegalAccessException {
-		for (int numberOfAttributes = -1; numberOfAttributes < 26; ++numberOfAttributes) {
-			final Output flinkCollector = mock(Output.class);
-			Tuple flinkTuple = null;
-			final Values tuple = new Values();
-
-			StormBoltCollector<?> collector;
-
-			final String streamId = "streamId";
-			HashMap<String, Integer> attributes = new HashMap<String, Integer>();
-			attributes.put(streamId, numberOfAttributes);
-
-			if (numberOfAttributes == -1) {
-				collector = new StormBoltCollector(attributes, flinkCollector);
-				tuple.add(new Integer(this.r.nextInt()));
-
-			} else {
-				collector = new StormBoltCollector(attributes, flinkCollector);
-				flinkTuple = Tuple.getTupleClass(numberOfAttributes).newInstance();
-
-				for (int i = 0; i < numberOfAttributes; ++i) {
-					tuple.add(new Integer(this.r.nextInt()));
-					flinkTuple.setField(tuple.get(i), i);
-				}
-			}
-
-			final Collection anchors = mock(Collection.class);
-			final List<Integer> taskIds;
-			taskIds = collector.emit(streamId, anchors, tuple);
-
-			Assert.assertNull(taskIds);
-
-			if (numberOfAttributes == -1) {
-				verify(flinkCollector).collect(tuple.get(0));
-			} else {
-				verify(flinkCollector).collect(flinkTuple);
-			}
-		}
-	}
-
-
-	@SuppressWarnings("unchecked")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testReportError() {
-		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).reportError(null);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testEmitDirect() {
-		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).emitDirect(0, null,
-				null, null);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testAck() {
-		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).ack(null);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test(expected = UnsupportedOperationException.class)
-	public void testFail() {
-		new StormBoltCollector<Object>(mock(HashMap.class), mock(Output.class)).fail(null);
-	}
-
-}