You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:37 UTC

[12/15] flink git commit: [Storm Compatibility] Maven module restucturing and cleanup - removed storm-parent; renamed storm-core and storm-examples - updated internal Java package structure * renamed package "stormcompatibility" to "storm" *

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 30227b8..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
- * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams.
- * 
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
-	private static final long serialVersionUID = 4672434660037669254L;
-
-	private final ArrayKeySelector<Tuple> selector;
-
-	public SplitStreamTypeKeySelector(int... fields) {
-		this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
-	}
-
-	@Override
-	public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
-		return selector.getKey(value.value);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
deleted file mode 100644
index 114fa7c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * This interface represents a Storm spout that emits a finite number of records. Common Storm
- * spouts emit infinite streams by default. To change this behaviour and take advantage of
- * Flink's finite-source capabilities, the spout should implement this interface. To wrap
- * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
- */
-public interface FiniteStormSpout extends IRichSpout {
-
-	/**
-	 * When returns true, the spout has reached the end of the stream.
-	 *
-	 * @return true, if the spout's stream reached its end, false otherwise
-	 */
-	public boolean reachedEnd();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
deleted file mode 100644
index 3eee8d6..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
- * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br />
- * <br />
- * <strong>CAUTION: Flink does not support direct emit.</strong>
- */
-public final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-	/** The declared output streams and schemas. */
-	public final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
-
-	@Override
-	public void declare(final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code direct} is {@code true}
-	 */
-	@Override
-	public void declare(final boolean direct, final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-	}
-
-	@Override
-	public void declareStream(final String streamId, final Fields fields) {
-		this.declareStream(streamId, false, fields);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code direct} is {@code true}
-	 */
-	@Override
-	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if (direct) {
-			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-		}
-
-		this.outputStreams.put(streamId, fields);
-	}
-
-	/**
-	 * Returns {@link TypeInformation} for the declared output schema for a specific stream.
-	 * 
-	 * @param streamId
-	 *            A stream ID.
-	 * 
-	 * @return output type information for the declared output schema of the specified stream; or {@code null} if
-	 *         {@code streamId == null}
-	 * 
-	 * @throws IllegalArgumentException
-	 *             If no output schema was declared for the specified stream or if more then 25 attributes got declared.
-	 */
-	public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
-		if (streamId == null) {
-			return null;
-		}
-
-		Fields outputSchema = this.outputStreams.get(streamId);
-		if (outputSchema == null) {
-			throw new IllegalArgumentException("Stream with ID '" + streamId
-					+ "' was not declared.");
-		}
-
-		Tuple t;
-		final int numberOfAttributes = outputSchema.size();
-
-		if (numberOfAttributes == 1) {
-			return TypeExtractor.getForClass(Object.class);
-		} else if (numberOfAttributes <= 25) {
-			try {
-				t = Tuple.getTupleClass(numberOfAttributes).newInstance();
-			} catch (final InstantiationException e) {
-				throw new RuntimeException(e);
-			} catch (final IllegalAccessException e) {
-				throw new RuntimeException(e);
-			}
-		} else {
-			throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
-		}
-
-		// TODO: declare only key fields as DefaultComparable
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			t.setField(new DefaultComparable(), i);
-		}
-
-		return TypeExtractor.getForObject(t);
-	}
-
-	/**
-	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
-	 * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
-	 * Flink cannot use them and will throw an exception.
-	 */
-	private static class DefaultComparable implements Comparable<DefaultComparable> {
-
-		public DefaultComparable() {
-		}
-
-		@Override
-		public int compareTo(final DefaultComparable o) {
-			return 0;
-		}
-	}
-
-	/**
-	 * Computes the indexes within the declared output schema of the specified stream, for a list of given
-	 * field-grouping attributes.
-	 * 
-	 * @param streamId
-	 *            A stream ID.
-	 * @param groupingFields
-	 *            The names of the key fields.
-	 * 
-	 * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
-	 *         list
-	 */
-	public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
-		final int[] fieldIndexes = new int[groupingFields.size()];
-
-		for (int i = 0; i < fieldIndexes.length; ++i) {
-			fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
-		}
-
-		return fieldIndexes;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
deleted file mode 100644
index 7e60a87..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-/**
- * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
- */
-public final class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
-	private static final long serialVersionUID = 2553423379715401023L;
-
-	/** internal cache to avoid short living ArrayList objects. */
-	private final HashMap<String, List<String>> streams = new HashMap<String, List<String>>();
-
-	@Override
-	public Iterable<String> select(SplitStreamType<T> value) {
-		String sid = value.streamId;
-		List<String> streamId = this.streams.get(sid);
-		if (streamId == null) {
-			streamId = new ArrayList<String>(1);
-			streamId.add(sid);
-			this.streams.put(sid, streamId);
-		}
-		return streamId;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
deleted file mode 100644
index 14af830..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import clojure.lang.Atom;
-
-/**
- * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
- * a Storm topology is executed within Flink.
- */
-public final class FlinkTopologyContext extends TopologyContext {
-
-	/**
-	 * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated
-	 * for each parallel task
-	 */
-	public FlinkTopologyContext(final StormTopology topology, @SuppressWarnings("rawtypes") final Map stormConf,
-			final Map<Integer, String> taskToComponent, final Map<String, List<Integer>> componentToSortedTasks,
-			final Map<String, Map<String, Fields>> componentToStreamToFields, final String stormId, final String codeDir,
-			final String pidDir, final Integer taskId, final Integer workerPort, final List<Integer> workerTasks,
-			final Map<String, Object> defaultResources, final Map<String, Object> userResources,
-			final Map<String, Object> executorData, @SuppressWarnings("rawtypes") final Map registeredMetrics,
-			final Atom openOrPrepareWasCalled) {
-		super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId,
-				codeDir, pidDir, taskId, workerPort, workerTasks, defaultResources, userResources, executorData,
-				registeredMetrics, openOrPrepareWasCalled);
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public void addTaskHook(final ITaskHook hook) {
-		throw new UnsupportedOperationException("Task hooks are not supported by Flink");
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public Collection<ITaskHook> getHooks() {
-		throw new UnsupportedOperationException("Task hooks are not supported by Flink");
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public IMetric getRegisteredMetricByName(final String name) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink");
-
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@SuppressWarnings("rawtypes")
-	@Override
-	public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink");
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@SuppressWarnings("rawtypes")
-	@Override
-	public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink");
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@SuppressWarnings("unchecked")
-	@Override
-	public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
-		throw new UnsupportedOperationException("Metrics are not supported by Flink");
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public <T extends ISubscribedState> T setAllSubscribedState(final T obj) {
-		throw new UnsupportedOperationException("Not supported by Flink");
-
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) {
-		throw new UnsupportedOperationException("Not supported by Flink");
-	}
-
-	/**
-	 * Not supported by Flink.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T
-			obj) {
-		throw new UnsupportedOperationException("Not supported by Flink");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
deleted file mode 100644
index 9cb44ec..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-/**
- * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get
- * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using
- * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and
- * {@link SplitStream#select(String...) .select(...)}).
- * 
- * @param <T>
- */
-public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> {
-	private static final long serialVersionUID = 3550359150160908564L;
-
-	@Override
-	public T map(SplitStreamType<T> value) throws Exception {
-		return value.value;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
deleted file mode 100644
index 9c7e477..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Used by {@link org.apache.flink.stormcompatibility.wrappers.AbstractStormCollector AbstractStormCollector} to wrap
- * output tuples if multiple output streams are declared. For this case, the Flink output data stream must be split via
- * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} using
- * {@link FlinkStormStreamSelector}.
- */
-public class SplitStreamType<T> {
-
-	/** The stream ID this tuple belongs to. */
-	public String streamId;
-	/** The actual data value. */
-	public T value;
-
-	@Override
-	public String toString() {
-		return "<sid:" + this.streamId + ",v:" + this.value + ">";
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		SplitStreamType<?> other = (SplitStreamType<?>) o;
-
-		return this.streamId.equals(other.streamId) && this.value.equals(other.value);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 200f772..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
- * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams.
- * 
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
-	private static final long serialVersionUID = 4672434660037669254L;
-
-	private final ArrayKeySelector<Tuple> selector;
-
-	public SplitStreamTypeKeySelector(int... fields) {
-		this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
-	}
-
-	@Override
-	public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
-		return selector.getKey(value.value);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
deleted file mode 100644
index 6726ae8..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-
-import backtype.storm.Config;
-
-/**
- * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
- * object) for embedded Spouts and Bolts.
- */
-@SuppressWarnings("rawtypes")
-public final class StormConfig extends GlobalJobParameters implements Map {
-	private static final long serialVersionUID = 8019519109673698490L;
-
-	/** Contains the actual configuration that is provided to Spouts and Bolts. */
-	private final Map config = new HashMap();
-
-	/**
-	 * Creates an empty configuration.
-	 */
-	public StormConfig() {
-	}
-
-	/**
-	 * Creates an configuration with initial values provided by the given {@code Map}.
-	 * 
-	 * @param config
-	 *            Initial values for this configuration.
-	 */
-	@SuppressWarnings("unchecked")
-	public StormConfig(Map config) {
-		this.config.putAll(config);
-	}
-
-
-	@Override
-	public int size() {
-		return this.config.size();
-	}
-
-	@Override
-	public boolean isEmpty() {
-		return this.config.isEmpty();
-	}
-
-	@Override
-	public boolean containsKey(Object key) {
-		return this.config.containsKey(key);
-	}
-
-	@Override
-	public boolean containsValue(Object value) {
-		return this.config.containsValue(value);
-	}
-
-	@Override
-	public Object get(Object key) {
-		return this.config.get(key);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Object put(Object key, Object value) {
-		return this.config.put(key, value);
-	}
-
-	@Override
-	public Object remove(Object key) {
-		return this.config.remove(key);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void putAll(Map m) {
-		this.config.putAll(m);
-	}
-
-	@Override
-	public void clear() {
-		this.config.clear();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Set<Object> keySet() {
-		return this.config.keySet();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Collection<Object> values() {
-		return this.config.values();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public Set<java.util.Map.Entry<Object, Object>> entrySet() {
-		return this.config.entrySet();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
deleted file mode 100644
index 7b35a64..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
-/**
- * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
- */
-abstract class AbstractStormCollector<OUT> {
-
-	/** Flink output tuple of concrete type {@link Tuple0} to {@link Tuple25} per output stream. */
-	protected final HashMap<String, Tuple> outputTuple = new HashMap<String, Tuple>();
-	/** Flink split tuple. Used, if multiple output streams are declared. */
-	private final SplitStreamType<Object> splitTuple = new SplitStreamType<Object>();
-	/**
-	 * The number of attributes of the output tuples per stream. (Determines the concrete type of {@link #outputTuple}).
-	 * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not used and "raw" data type is used.
-	 */
-	protected final HashMap<String, Integer> numberOfAttributes;
-	/** Indicates of multiple output stream are declared and thus {@link SplitStreamType} must be used as output. */
-	private final boolean split;
-	/** Is set to {@code true} each time a tuple is emitted. */
-	boolean tupleEmitted = false;
-
-	/**
-	 * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via {@link #doEmit(Object)}. If the
-	 * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
-	 * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
-	 * 
-	 * @param numberOfAttributes
-	 *            The number of attributes of the emitted tuples per output stream.
-	 * @throws UnsupportedOperationException
-	 *             if the specified number of attributes is greater than 25
-	 */
-	public AbstractStormCollector(final HashMap<String, Integer> numberOfAttributes)
-			throws UnsupportedOperationException {
-		assert (numberOfAttributes != null);
-
-		this.numberOfAttributes = numberOfAttributes;
-		this.split = this.numberOfAttributes.size() > 1;
-
-		for (Entry<String, Integer> outputStream : numberOfAttributes.entrySet()) {
-			final int numAtt = outputStream.getValue();
-			assert (numAtt >= -1);
-
-			if (numAtt > 25) {
-				throw new UnsupportedOperationException(
-						"Flink cannot handle more then 25 attributes, but " + numAtt
-						+ " are declared for stream '" + outputStream.getKey()
-						+ "' by the given bolt");
-			} else if (numAtt >= 0) {
-				try {
-					this.outputTuple.put(outputStream.getKey(),
-							org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt)
-							.newInstance());
-				} catch (final InstantiationException e) {
-					throw new RuntimeException(e);
-				} catch (final IllegalAccessException e) {
-					throw new RuntimeException(e);
-				}
-
-			}
-		}
-	}
-
-	/**
-	 * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
-	 * to the specified output stream.
-	 * 
-	 * @param The
-	 *            The output stream id.
-	 * @param tuple
-	 *            The Storm tuple to be emitted.
-	 * @return the return value of {@link #doEmit(Object)}
-	 */
-	@SuppressWarnings("unchecked")
-	protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
-		List<Integer> taskIds;
-
-		final int numAtt = this.numberOfAttributes.get(streamId);
-		if (numAtt > -1) {
-			assert (tuple.size() == numAtt);
-			Tuple out = this.outputTuple.get(streamId);
-			for (int i = 0; i < numAtt; ++i) {
-				out.setField(tuple.get(i), i);
-			}
-			if (this.split) {
-				this.splitTuple.streamId = streamId;
-				this.splitTuple.value = out;
-
-				taskIds = doEmit((OUT) this.splitTuple);
-			} else {
-				taskIds = doEmit((OUT) out);
-			}
-
-		} else {
-			assert (tuple.size() == 1);
-			if (split) {
-				this.splitTuple.streamId = streamId;
-				this.splitTuple.value = tuple.get(0);
-
-				taskIds = doEmit((OUT) this.splitTuple);
-			} else {
-				taskIds = doEmit((OUT) tuple.get(0));
-			}
-		}
-		this.tupleEmitted = true;
-
-		return taskIds;
-	}
-
-	/**
-	 * Emits a Flink tuple.
-	 * 
-	 * @param flinkTuple
-	 * 		The tuple to be emitted.
-	 * @return the IDs of the tasks this tuple was sent to
-	 */
-	protected abstract List<Integer> doEmit(OUT flinkTuple);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
deleted file mode 100644
index ccd29bb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-import java.util.HashMap;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-/**
- * A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
- * Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
- * {@link StormSpoutCollector} for supported types).<br />
- * <br />
- * <strong>CAUTION: currently, only simple spouts are supported! (ie, spouts that do not use the Storm configuration
- * <code>Map</code> or <code>TopologyContext</code> that is provided by the spouts's <code>prepare(..)</code> method.
- * Furthermore, ack and fail back calls as well as tuple IDs are not supported so far.</strong>
- */
-public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
-	private static final long serialVersionUID = 4993283609095408765L;
-
-	/** Number of attributes of the bolt's output tuples per stream. */
-	private final HashMap<String, Integer> numberOfAttributes;
-	/** The wrapped Storm {@link IRichSpout spout}. */
-	protected final IRichSpout spout;
-	/** The wrapper of the given Flink collector. */
-	protected StormSpoutCollector<OUT> collector;
-	/** Indicates, if the source is still running or was canceled. */
-	protected volatile boolean isRunning = true;
-	/** The original Storm topology. */
-	protected StormTopology stormTopology;
-
-	/**
-	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
-	 * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
-	 * {@link Tuple25} depending on the spout's declared number of attributes.
-	 *
-	 * @param spout
-	 * 		The Storm {@link IRichSpout spout} to be used.
-	 * @throws IllegalArgumentException
-	 * 		If the number of declared output attributes is not with range [0;25].
-	 */
-	public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
-		this(spout, null);
-	}
-
-	/**
-	 * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
-	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
-	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
-	 * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
-	 * number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public AbstractStormSpoutWrapper(final IRichSpout spout,
-			final Collection<String> rawOutputs)
-					throws IllegalArgumentException {
-		this.spout = spout;
-		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
-	}
-
-	/**
-	 * Sets the original Storm topology.
-	 * 
-	 * @param stormTopology
-	 *            The original Storm topology.
-	 */
-	public void setStormTopology(StormTopology stormTopology) {
-		this.stormTopology = stormTopology;
-	}
-
-	@Override
-	public final void run(final SourceContext<OUT> ctx) throws Exception {
-		this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
-
-		GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
-		StormConfig stormConfig = new StormConfig();
-
-		if (config != null) {
-			if (config instanceof StormConfig) {
-				stormConfig = (StormConfig) config;
-			} else {
-				stormConfig.putAll(config.toMap());
-			}
-		}
-
-		this.spout.open(stormConfig,
-				StormWrapperSetupHelper.createTopologyContext(
-					(StreamingRuntimeContext) super.getRuntimeContext(),
-					this.spout,
-					this.stormTopology,
-					null),
-				new SpoutOutputCollector(this.collector));
-		this.spout.activate();
-		this.execute();
-	}
-
-	/**
-	 * Needs to be implemented to call the given Spout's {@link IRichSpout#nextTuple() nextTuple()} method. This method
-	 * might use a {@code while(true)}-loop to emit an infinite number of tuples.
-	 */
-	protected abstract void execute();
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Sets the {@link #isRunning} flag to {@code false}.
-	 */
-	@Override
-	public void cancel() {
-		this.isRunning = false;
-	}
-
-	@Override
-	public void close() throws Exception {
-		this.spout.close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
deleted file mode 100644
index f499ecc..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped
- * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link
- * FiniteStormSpout#reachedEnd()} is true.
- */
-public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
-	private static final long serialVersionUID = -218340336648247605L;
-
-	private FiniteStormSpout finiteSpout;
-
-	/**
-	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
-	 * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
-	 * {@link Tuple25} depending on the spout's declared number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link FiniteStormSpout spout} to be used.
-	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [0;25].
-	 */
-	public FiniteStormSpoutWrapper(FiniteStormSpout spout)
-			throws IllegalArgumentException {
-		super(spout);
-		this.finiteSpout = spout;
-	}
-
-	/**
-	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
-	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
-	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
-	 * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
-	 * number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link FiniteStormSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final String[] rawOutputs)
-			throws IllegalArgumentException {
-		this(spout, Sets.newHashSet(rawOutputs));
-	}
-
-	/**
-	 * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
-	 * that it can be used within a Flink streaming program. The output type can be any type if parameter
-	 * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
-	 * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
-	 * number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link FiniteStormSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final Collection<String> rawOutputs)
-			throws IllegalArgumentException {
-		super(spout, rawOutputs);
-		this.finiteSpout = spout;
-	}
-
-	/**
-	 * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link FiniteStormSpout#reachedEnd()} is true or
-	 * {@link FiniteStormSpout#cancel()} is called.
-	 */
-	@Override
-	protected void execute() {
-		while (super.isRunning && !finiteSpout.reachedEnd()) {
-			finiteSpout.nextTuple();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
deleted file mode 100644
index 3cd27d4..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.HashMap;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * {@link SetupOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and
- * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)}
- * method.
- */
-class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-	/** The number of attributes for each declared stream by the wrapped operator. */
-	HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
-
-	@Override
-	public void declare(final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-	}
-
-	@Override
-	public void declare(final boolean direct, final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-	}
-
-	@Override
-	public void declareStream(final String streamId, final Fields fields) {
-		this.declareStream(streamId, false, fields);
-	}
-
-	@Override
-	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if (streamId == null) {
-			throw new IllegalArgumentException("Stream ID cannot be null.");
-		}
-		if (direct) {
-			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-		}
-
-		this.outputSchemas.put(streamId, fields.size());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
deleted file mode 100644
index e810214..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.tuple.Tuple;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.util.Collector;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A {@link StormBoltCollector} is used by {@link StormBoltWrapper} to provided an Storm compatible
- * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples
- * and emits them via the provide {@link Output} object.
- */
-class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {
-
-	/** The Flink output Collector */
-	private final Collector<OUT> flinkOutput;
-
-	/**
-	 * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink output object. If the
-	 * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
-	 * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
-	 * 
-	 * @param numberOfAttributes
-	 *            The number of attributes of the emitted tuples per output stream.
-	 * @param flinkOutput
-	 *            The Flink output object to be used.
-	 * @throws UnsupportedOperationException
-	 *             if the specified number of attributes is greater than 25
-	 */
-	public StormBoltCollector(final HashMap<String, Integer> numberOfAttributes,
-			final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
-		super(numberOfAttributes);
-		assert (flinkOutput != null);
-		this.flinkOutput = flinkOutput;
-	}
-
-	@Override
-	protected List<Integer> doEmit(final OUT flinkTuple) {
-		this.flinkOutput.collect(flinkTuple);
-		// TODO
-		return null;
-	}
-
-	@Override
-	public void reportError(final Throwable error) {
-		// not sure, if Flink can support this
-		throw new UnsupportedOperationException("Not implemented yet");
-	}
-
-	@Override
-	public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
-		return this.tansformAndEmit(streamId, tuple);
-	}
-
-	@Override
-	public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
-		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-	}
-
-	@Override
-	public void ack(final Tuple input) {
-		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
-	}
-
-	@Override
-	public void fail(final Tuple input) {
-		throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
deleted file mode 100644
index 715d6df..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-import java.util.HashMap;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
- * program. It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the
- * bolt can process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type
- * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br />
- * <br />
- * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts that do not use the Storm configuration
- * <code>Map</code> or <code>TopologyContext</code> that is provided by the bolt's <code>open(..)</code> method.
- * Furthermore, acking and failing of tuples as well as accessing tuple attributes by field names is not supported so
- * far.</strong>
- */
-public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
-	private static final long serialVersionUID = -4788589118464155835L;
-
-	/** The wrapped Storm {@link IRichBolt bolt}. */
-	private final IRichBolt bolt;
-	/** Number of attributes of the bolt's output tuples per stream. */
-	private final HashMap<String, Integer> numberOfAttributes;
-	/** The schema (ie, ordered field names) of the input stream. */
-	private final Fields inputSchema;
-	/** The original Storm topology. */
-	protected StormTopology stormTopology;
-
-	/**
-	 *  We have to use this because Operators must output
-	 *  {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
-	 */
-	private TimestampedCollector<OUT> flinkCollector;
-
-	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
-	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
-	 * for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's
-	 * declared number of attributes.
-	 * 
-	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
-	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [0;25].
-	 */
-	public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
-		this(bolt, null, (Collection<String>) null);
-	}
-
-	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
-	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
-	 * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on
-	 * the bolt's declared number of attributes.
-	 * 
-	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
-	 * @param inputSchema
-	 *            The schema (ie, ordered field names) of the input stream.
-	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [0;25].
-	 */
-	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
-			throws IllegalArgumentException {
-		this(bolt, inputSchema, (Collection<String>) null);
-	}
-
-	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
-	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
-	 * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
-	 * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
-	 * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
-	 * 
-	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
-	 */
-	public StormBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
-			throws IllegalArgumentException {
-		this(bolt, null, Sets.newHashSet(rawOutputs));
-	}
-
-	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
-	 * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
-	 * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
-	 * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
-	 * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
-	 * 
-	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [1;25].
-	 */
-	public StormBoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
-			throws IllegalArgumentException {
-		this(bolt, null, rawOutputs);
-	}
-
-	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
-	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
-	 * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
-	 * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
-	 * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
-	 * 
-	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
-	 * @param inputSchema
-	 *            The schema (ie, ordered field names) of the input stream.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
-			final String[] rawOutputs) throws IllegalArgumentException {
-		this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
-	}
-
-	/**
-	 * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
-	 * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
-	 * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
-	 * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
-	 * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
-	 * 
-	 * @param bolt
-	 *            The Storm {@link IRichBolt bolt} to be used.
-	 * @param inputSchema
-	 *            The schema (ie, ordered field names) of the input stream.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
-			final Collection<String> rawOutputs) throws IllegalArgumentException {
-		this.bolt = bolt;
-		this.inputSchema = inputSchema;
-		this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
-	}
-
-	/**
-	 * Sets the original Storm topology.
-	 * 
-	 * @param stormTopology
-	 *            The original Storm topology.
-	 */
-	public void setStormTopology(StormTopology stormTopology) {
-		this.stormTopology = stormTopology;
-	}
-
-	@Override
-	public void open(final Configuration parameters) throws Exception {
-		super.open(parameters);
-
-		final TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext(
-				super.runtimeContext, this.bolt, this.stormTopology, null);
-		flinkCollector = new TimestampedCollector<OUT>(output);
-		OutputCollector stormCollector = null;
-
-		if (this.numberOfAttributes.size() > 0) {
-			stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
-					this.numberOfAttributes, flinkCollector));
-		}
-
-		GlobalJobParameters config = super.executionConfig.getGlobalJobParameters();
-		StormConfig stormConfig = new StormConfig();
-
-		if (config != null) {
-			if (config instanceof StormConfig) {
-				stormConfig = (StormConfig) config;
-			} else {
-				stormConfig.putAll(config.toMap());
-			}
-		}
-
-		this.bolt.prepare(stormConfig, topologyContext, stormCollector);
-	}
-
-	@Override
-	public void dispose() {
-		this.bolt.cleanup();
-	}
-
-	@SuppressWarnings("unchecked")
-	@Override
-	public void processElement(final StreamRecord<IN> element) throws Exception {
-		flinkCollector.setTimestamp(element.getTimestamp());
-		IN value = element.getValue();
-		if (value instanceof SplitStreamType) {
-			this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value,
-					inputSchema));
-		} else {
-			this.bolt.execute(new StormTuple<IN>(value, inputSchema));
-		}
-	}
-
-	@Override
-	public void processWatermark(Watermark mark) throws Exception {
-		output.emitWatermark(mark);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
deleted file mode 100644
index 45eb56c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls {@link IRichSpout#nextTuple()
- * nextTuple()} for finite number of times before
- * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)} returns. The number of
- * {@code nextTuple()} calls can be specified as a certain number of invocations or can be undefined. In the undefined
- * case, the {@code run(...)} method return if no record was emitted to the output collector for the first time.
- */
-public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
-	private static final long serialVersionUID = 3883246587044801286L;
-
-	/** The number of {@link IRichSpout#nextTuple()} calls */
-	private int numberOfInvocations;
-
-	/**
-	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
-	 * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
-	 * attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [0;25].
-	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
-		this(spout, (Collection<String>) null, -1);
-	}
-
-	/**
-	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one
-	 * of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param numberOfInvocations
-	 *            The number of calls to {@link IRichSpout#nextTuple()}.
-	 * @throws IllegalArgumentException
-	 *             If the number of declared output attributes is not with range [0;25].
-	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations)
-			throws IllegalArgumentException {
-		this(spout, (Collection<String>) null, numberOfInvocations);
-	}
-
-	/**
-	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
-	 * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
-	 * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
-	 * {@link Tuple25} depending on the spout's declared number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
-			throws IllegalArgumentException {
-		this(spout, Sets.newHashSet(rawOutputs), -1);
-	}
-
-	/**
-	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
-	 * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
-	 * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
-	 * {@link Tuple25} depending on the spout's declared number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs)
-			throws IllegalArgumentException {
-		this(spout, rawOutputs, -1);
-	}
-
-	/**
-	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
-	 * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
-	 * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
-	 * the spout's declared number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @param numberOfInvocations
-	 *            The number of calls to {@link IRichSpout#nextTuple()}.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
-			final int numberOfInvocations) throws IllegalArgumentException {
-		super(spout, Sets.newHashSet(rawOutputs));
-		this.numberOfInvocations = numberOfInvocations;
-	}
-
-	/**
-	 * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
-	 * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
-	 * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
-	 * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
-	 * the spout's declared number of attributes.
-	 * 
-	 * @param spout
-	 *            The Storm {@link IRichSpout spout} to be used.
-	 * @param rawOutputs
-	 *            Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
-	 *            of a raw type.
-	 * @param numberOfInvocations
-	 *            The number of calls to {@link IRichSpout#nextTuple()}.
-	 * @throws IllegalArgumentException
-	 *             If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
-	 *             {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
-	 *             [0;25].
-	 */
-	public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs,
-			final int numberOfInvocations) throws IllegalArgumentException {
-		super(spout, rawOutputs);
-		this.numberOfInvocations = numberOfInvocations;
-	}
-
-	/**
-	 * Calls {@link IRichSpout#nextTuple()} for the given number of times.
-	 */
-	@Override
-	protected void execute() {
-		if (this.numberOfInvocations >= 0) {
-			while ((--this.numberOfInvocations >= 0) && super.isRunning) {
-				super.spout.nextTuple();
-			}
-		} else {
-			do {
-				super.collector.tupleEmitted = false;
-				super.spout.nextTuple();
-			} while (super.collector.tupleEmitted && super.isRunning);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
deleted file mode 100644
index 5a20056..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A {@link StormSpoutCollector} is used by {@link AbstractStormSpoutWrapper} to provided an Storm
- * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
- * Flink tuples and emits them via the provide {@link SourceContext} object.
- */
-class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {
-
-	/** The Flink source context object */
-	private final SourceContext<OUT> flinkContext;
-
-	/**
-	 * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink source context. If the
-	 * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
-	 * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
-	 * 
-	 * @param numberOfAttributes
-	 *            The number of attributes of the emitted tuples.
-	 * @param flinkContext
-	 *            The Flink source context to be used.
-	 * @throws UnsupportedOperationException
-	 *             if the specified number of attributes is greater than 25
-	 */
-	public StormSpoutCollector(final HashMap<String, Integer> numberOfAttributes,
-			final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
-		super(numberOfAttributes);
-		assert (flinkContext != null);
-		this.flinkContext = flinkContext;
-	}
-
-	@Override
-	protected List<Integer> doEmit(final OUT flinkTuple) {
-		this.flinkContext.collect(flinkTuple);
-		// TODO
-		return null;
-	}
-
-	@Override
-	public void reportError(final Throwable error) {
-		// not sure, if Flink can support this
-		throw new UnsupportedOperationException("Not implemented yet");
-	}
-
-	@Override
-	public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
-		return this.tansformAndEmit(streamId, tuple);
-	}
-
-
-	@Override
-	public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
-		throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-	}
-
-}