You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/15 11:33:10 UTC

[20/27] flink git commit: [storm-compat] Moved Storm-compatibility to flink-contrib and split flink-contrib into small sub-projects

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
deleted file mode 100755
index a2986a3..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/Tweet.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet;
-
-import org.apache.flink.contrib.tweetinputformat.model.User.Users;
-import org.apache.flink.contrib.tweetinputformat.model.places.Places;
-import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.Entities;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class Tweet {
-
-	private List<Contributors> contributors;
-
-	private Coordinates coordinates;
-
-	private String created_at = "";
-
-	private Entities entities;
-
-	private long favorite_count;
-
-	private boolean favorited;
-
-	private String filter_level = "";
-
-	private long id;
-
-	private String id_str = "";
-
-	private String in_reply_to_screen_name = "";
-
-	private long in_reply_to_status_id;
-
-	private String in_reply_to_status_id_str = "";
-
-	private long in_reply_to_user_id;
-
-	private String in_reply_to_user_id_str = "";
-
-	private String lang = "";
-
-	// Places
-	private Places place;
-
-	private boolean possibly_sensitive;
-
-	private long retweet_count;
-
-	private boolean retweeted;
-
-	private CurrentUserRetweet currentUserRetweet;
-
-	private String source = "";
-
-	private String text = "";
-
-	private boolean truncated;
-
-	private Users user;
-
-	// to Hanlde retweeted_status
-	private Tweet retweeted_status;
-
-	private int tweetLevel;
-
-	public Tweet() {
-		tweetLevel = 0;
-		reset(tweetLevel);
-	}
-
-	public Tweet(int level) {
-		tweetLevel = level;
-		reset(tweetLevel);
-	}
-
-
-	// to avoid FLINK KRYO serializer problem
-	public void reset(int level) {
-
-		contributors = new ArrayList<Contributors>();
-		coordinates = new Coordinates();
-		created_at = "";
-		entities = new Entities();
-		favorite_count = 0L;
-		favorited = false;
-		filter_level = "";
-		id = 0L;
-		id_str = "";
-		in_reply_to_screen_name = "";
-		in_reply_to_status_id = 0L;
-		in_reply_to_status_id_str = "";
-		in_reply_to_user_id = 0L;
-		in_reply_to_user_id_str = "";
-		lang = "";
-		place = new Places();
-		possibly_sensitive = false;
-		retweet_count = 0L;
-
-		// to Hanlde retweeted_status
-		if (level == 0) {
-			retweeted_status = new Tweet(++level);
-		}
-
-
-		currentUserRetweet = new CurrentUserRetweet();
-		retweeted = false;
-		source = "";
-		text = "";
-		truncated = false;
-		user = new Users();
-
-	}
-
-	public List<Contributors> getContributors() {
-		return contributors;
-	}
-
-	public void setContributors(List<Contributors> contributors) {
-		this.contributors = contributors;
-	}
-
-	public Coordinates getCoordinates() {
-		return coordinates;
-	}
-
-	public void setCoordinates(Coordinates coordinates) {
-		this.coordinates = coordinates;
-	}
-
-	public String getCreated_at() {
-		return created_at;
-	}
-
-	public void setCreated_at(String created_at) {
-		this.created_at = created_at;
-	}
-
-	public Entities getEntities() {
-		return entities;
-	}
-
-	public void setEntities(Entities entities) {
-		this.entities = entities;
-	}
-
-	public long getFavorite_count() {
-		return favorite_count;
-	}
-
-	public void setFavorite_count(long favorite_count) {
-		this.favorite_count = favorite_count;
-	}
-
-	public boolean isFavorited() {
-		return favorited;
-	}
-
-	public void setFavorited(boolean favorited) {
-		this.favorited = favorited;
-	}
-
-	public String getFilter_level() {
-		return filter_level;
-	}
-
-	public void setFilter_level(String filter_level) {
-		this.filter_level = filter_level;
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	public void setId(long id) {
-		this.id = id;
-	}
-
-	public String getId_str() {
-		return id_str;
-	}
-
-	public void setId_str(String id_str) {
-		this.id_str = id_str;
-	}
-
-	public String getIn_reply_to_screen_name() {
-		return in_reply_to_screen_name;
-	}
-
-	public void setIn_reply_to_screen_name(String in_reply_to_screen_name) {
-		this.in_reply_to_screen_name = in_reply_to_screen_name;
-	}
-
-
-	public long getIn_reply_to_status_id() {
-		return in_reply_to_status_id;
-	}
-
-	public void setIn_reply_to_status_id(long in_reply_to_status_id) {
-		this.in_reply_to_status_id = in_reply_to_status_id;
-	}
-
-	public String getIn_reply_to_status_id_str() {
-		return in_reply_to_status_id_str;
-	}
-
-	public void setIn_reply_to_status_id_str(String in_reply_to_status_id_str) {
-		this.in_reply_to_status_id_str = in_reply_to_status_id_str;
-	}
-
-	public long getIn_reply_to_user_id() {
-		return in_reply_to_user_id;
-	}
-
-	public void setIn_reply_to_user_id(long in_reply_to_user_id) {
-		this.in_reply_to_user_id = in_reply_to_user_id;
-	}
-
-	public String getIn_reply_to_user_id_str() {
-		return in_reply_to_user_id_str;
-	}
-
-	public void setIn_reply_to_user_id_str(String in_reply_to_user_id_str) {
-		this.in_reply_to_user_id_str = in_reply_to_user_id_str;
-	}
-
-	public String getLang() {
-		return lang;
-	}
-
-	public void setLang(String lang) {
-		this.lang = lang;
-	}
-
-	public Places getPlace() {
-		return place;
-	}
-
-	public void setPlace(Places place) {
-		this.place = place;
-	}
-
-	public boolean getPossibly_sensitive() {
-		return possibly_sensitive;
-	}
-
-	public void setPossibly_sensitive(boolean possibly_sensitive) {
-		this.possibly_sensitive = possibly_sensitive;
-	}
-
-	public long getRetweet_count() {
-		return retweet_count;
-	}
-
-	public void setRetweet_count(long retweet_count) {
-		this.retweet_count = retweet_count;
-	}
-
-	public boolean isRetweeted() {
-		return retweeted;
-	}
-
-	public void setRetweeted(boolean retweeted) {
-		this.retweeted = retweeted;
-	}
-
-	public String getSource() {
-		return source;
-	}
-
-	public void setSource(String source) {
-		this.source = source;
-	}
-
-	public String getText() {
-		return text;
-	}
-
-	public void setText(String text) {
-		this.text = text;
-	}
-
-	public boolean isTruncated() {
-		return truncated;
-	}
-
-	public void setTruncated(boolean truncated) {
-		this.truncated = truncated;
-	}
-
-	public Users getUser() {
-		return user;
-	}
-
-	public void setUser(Users user) {
-		this.user = user;
-	}
-
-	public CurrentUserRetweet getCurrentUserRetweet() {
-		return currentUserRetweet;
-	}
-
-	public void setCurrentUserRetweet(CurrentUserRetweet currentUserRetweet) {
-		this.currentUserRetweet = currentUserRetweet;
-	}
-
-
-	public boolean isPossibly_sensitive() {
-		return possibly_sensitive;
-	}
-
-	public Tweet getRetweeted_status() {
-		return retweeted_status;
-	}
-
-	public void setRetweeted_status(Tweet retweeted_status) {
-		this.retweeted_status = retweeted_status;
-	}
-
-	public int getTweetLevel() {
-		return tweetLevel;
-	}
-
-	public void setTweetLevel(int tweetLevel) {
-		this.tweetLevel = tweetLevel;
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
deleted file mode 100755
index d88ea34..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Entities.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Entities which have been parsed out of the text of the
- * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
- */
-public class Entities {
-
-	private List<HashTags> hashtags;
-
-	private List<Media> media;
-
-	private List<URL> urls;
-
-	private List<UserMention> user_mentions;
-
-	private List<Symbol> symbols;
-
-	public Entities() {
-
-		hashtags = new ArrayList<HashTags>();
-		media = new ArrayList<Media>();
-		urls = new ArrayList<URL>();
-		user_mentions = new ArrayList<UserMention>();
-		symbols = new ArrayList<Symbol>();
-
-	}
-
-	public List<HashTags> getHashtags() {
-		return hashtags;
-	}
-
-	public void setHashtags(List<HashTags> hashtags) {
-		this.hashtags = hashtags;
-	}
-
-	public List<Media> getMedia() {
-		return media;
-	}
-
-	public void setMedia(List<Media> media) {
-		this.media = media;
-	}
-
-	public List<URL> getUrls() {
-		return urls;
-	}
-
-	public void setUrls(List<URL> urls) {
-		this.urls = urls;
-	}
-
-	public List<UserMention> getUser_mentions() {
-		return user_mentions;
-	}
-
-	public void setUser_mentions(List<UserMention> user_mentions) {
-		this.user_mentions = user_mentions;
-	}
-
-
-	public List<Symbol> getSymbols() {
-		return symbols;
-	}
-
-	public void setSymbols(List<Symbol> symbols) {
-		this.symbols = symbols;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
deleted file mode 100755
index 1900859..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/HashTags.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
-
-/**
- * Represents hashtags which have been parsed out of the
- * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} text.
- */
-
-public class HashTags {
-
-	private long[] indices = new long[2];
-
-	private String text = "";
-
-
-	public long[] getIndices() {
-		return indices;
-	}
-
-	public void setIndices(long[] indices) {
-		this.indices = indices;
-	}
-
-	public void setIndices(long start, long end) {
-		this.indices[0] = start;
-		this.indices[1] = end;
-
-	}
-
-	public String getText() {
-		return text;
-	}
-
-	public void setText(String text, boolean hashExist) {
-		if (hashExist) {
-			this.text = text.substring((int) indices[0] + 1);
-		} else {
-			this.text = text;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
deleted file mode 100755
index f006aac..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Media.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Represents media elements uploaded with the {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
- */
-public class Media {
-
-
-	private String display_url = "";
-
-	private String expanded_url = "";
-
-	private long id;
-
-	private String id_str = "";
-
-	private long[] indices;
-
-	private String media_url = "";
-
-	private String media_url_https = "";
-
-	private Map<String, Size> sizes;
-
-	private String type = "";
-
-	private String url = "";
-
-	public Media() {
-
-		this.display_url = "";
-		this.expanded_url = "";
-		this.id = 0L;
-		this.id_str = "";
-		this.setIndices(new long[]{0L, 0L});
-		this.media_url = "";
-		this.media_url_https = "";
-		this.sizes = new HashMap<String, Size>();
-		this.type = "";
-		this.url = "";
-
-	}
-
-	public String getDisplay_url() {
-		return display_url;
-	}
-
-	public void setDisplay_url(String display_url) {
-		this.display_url = display_url;
-	}
-
-	public String getExpanded_url() {
-		return expanded_url;
-	}
-
-	public void setExpanded_url(String expanded_url) {
-		this.expanded_url = expanded_url;
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	public void setId(long id) {
-		this.id = id;
-	}
-
-	public String getId_str() {
-		return id_str;
-	}
-
-	public void setId_str(String id_str) {
-		this.id_str = id_str;
-	}
-
-	public long[] getIndices() {
-		return indices;
-	}
-
-	public void setIndices(long[] indices) {
-		this.indices = indices;
-	}
-
-	public String getMedia_url() {
-		return media_url;
-	}
-
-	public void setMedia_url(String media_url) {
-		this.media_url = media_url;
-	}
-
-	public String getMedia_url_https() {
-		return media_url_https;
-	}
-
-	public void setMedia_url_https(String media_url_https) {
-		this.media_url_https = media_url_https;
-	}
-
-	public Map<String, Size> getSizes() {
-		return sizes;
-	}
-
-	public void setSizes(Map<String, Size> sizes) {
-		this.sizes = sizes;
-	}
-
-	public String getType() {
-		return type;
-	}
-
-	public void setType(String type) {
-		this.type = type;
-	}
-
-	public String getUrl() {
-		return url;
-	}
-
-	public void setUrl(String url) {
-		this.url = url;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java
deleted file mode 100755
index 4f9b81c..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Size.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
-
-/**
- * An object showing available sizes for the media file.
- */
-public class Size {
-
-	private long w;
-
-	private long h;
-
-	private String resize = "";
-
-
-	public Size(long width, long height, String resize) {
-
-		this.w = width;
-		this.h = height;
-		this.resize = resize;
-
-	}
-
-
-	public long getWidth() {
-		return w;
-	}
-
-	public void setWidth(long width) {
-		this.w = width;
-	}
-
-	public long getHeight() {
-		return h;
-	}
-
-	public void setHeight(long height) {
-		this.h = height;
-	}
-
-	public String getResize() {
-		return resize;
-	}
-
-	public void setResize(String resize) {
-		this.resize = resize;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java
deleted file mode 100755
index db562e8..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/Symbol.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
-
-/**
- * An array of financial symbols starting with the dollar sign extracted from the
- * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet} text.
- */
-
-public class Symbol {
-
-	private String text = "";
-
-	private long[] indices;
-
-	public Symbol() {
-		this.text = "";
-		this.setIndices(new long[]{0L, 0L});
-
-	}
-
-	public String getText() {
-		return text;
-	}
-
-	public void setText(String text) {
-		this.text = text;
-	}
-
-	public long[] getIndices() {
-		return indices;
-	}
-
-	public void setIndices(long[] indices) {
-		this.indices = indices;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java
deleted file mode 100755
index 6d0f184..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/URL.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
-
-/**
- * Represents URLs included in the text of a Tweet or within textual fields of a
- * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.User.Users} object.
- */
-public class URL {
-
-	private String url = "";
-
-	private String display_url = "";
-
-	private String expanded_url = "";
-
-	private long[] indices;
-
-	public URL() {
-		this.setIndices(new long[]{0L, 0L});
-	}
-
-
-	public String getUrl() {
-		return url;
-	}
-
-	public void setUrl(String url) {
-		this.url = url;
-	}
-
-	public String getDisplay_url() {
-		return display_url;
-	}
-
-	public void setDisplay_url(String display_url) {
-		this.display_url = display_url;
-	}
-
-	public String getExpanded_url() {
-		return expanded_url;
-	}
-
-	public void setExpanded_url(String expanded_url) {
-		this.expanded_url = expanded_url;
-	}
-
-	public long[] getIndices() {
-		return indices;
-	}
-
-	public void setIndices(long[] indices) {
-		this.indices = indices;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java b/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java
deleted file mode 100755
index a56f7c7..0000000
--- a/flink-contrib/src/main/java/org/apache/flink/contrib/tweetinputformat/model/tweet/entities/UserMention.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.contrib.tweetinputformat.model.tweet.entities;
-
-/**
- * Represents other Twitter users mentioned in the text of the
- * {@link package org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet}.
- */
-public class UserMention {
-
-	private long id;
-
-	private String id_str = "";
-
-	private String screen_name = "";
-
-	private String name = "";
-
-	private long[] indices;
-
-	public UserMention() {
-
-		this.setIndices(new long[]{0L, 0L});
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	public void setId(long id) {
-		this.id = id;
-	}
-
-	public String getId_str() {
-		return id_str;
-	}
-
-	public void setId_str() {
-		this.id_str = Long.toString(id);
-	}
-
-	public String getScreen_name() {
-		return screen_name;
-	}
-
-	public void setScreen_name(String screen_name) {
-		this.screen_name = screen_name;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-
-	public long[] getIndices() {
-		return indices;
-	}
-
-	public void setIndices(long[] indices) {
-		this.indices = indices;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/resources/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/resources/.hidden b/flink-contrib/src/main/resources/.hidden
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/main/resources/HashTagTweetSample.json
----------------------------------------------------------------------
diff --git a/flink-contrib/src/main/resources/HashTagTweetSample.json b/flink-contrib/src/main/resources/HashTagTweetSample.json
deleted file mode 100644
index ddf8f75..0000000
--- a/flink-contrib/src/main/resources/HashTagTweetSample.json
+++ /dev/null
@@ -1,4 +0,0 @@
-{"created_at":"Mon Jan 1 00:00:00 +0000 1901","id":100000000000000000,"id_str":"100000000000000000","text":"Apache Flink","source":null,"truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":1000000000,"id_str":"1000000000","name":"Apache Flink","screen_name":"Apache Flink","location":"Berlin","url":null,"description":null,"protected":false,"verified":false,"followers_count":999,"friends_count":999,"listed_count":9,"favourites_count":9999,"statuses_count":99999,"created_at":"Mon Jan 1 00:00:00 +0000 1901","utc_offset":7200,"time_zone":"Amsterdam","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":null,"profile_background_image_url_https":null,"profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fil
 l_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":null,"profile_image_url_https":null,"profile_banner_url":null,"default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"example","indices":[0,16]},{"text":"tweet","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,,"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"}
-{"created_at":"Mon Jan 1 00:00:00 +0000 1901","id":200000000000000000,"id_str":"200000000000000000","text":"Apache Flink","source":null,"truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":1000000000,"id_str":"1000000000","name":"Apache Flink","screen_name":"Apache Flink","location":"Berlin","url":null,"description":null,"protected":false,"verified":false,"followers_count":999,"friends_count":999,"listed_count":9,"favourites_count":9999,"statuses_count":99999,"created_at":"Mon Jan 1 00:00:00 +0000 1901","utc_offset":7200,"time_zone":"Amsterdam","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":null,"profile_background_image_url_https":null,"profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fil
 l_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":null,"profile_image_url_https":null,"profile_banner_url":null,"default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"example","indices":[0,16]},{"text":"tweet","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,,"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"}
-{"created_at":"Mon Jan 1 00:00:00 +0000 1901","id":300000000000000000,"id_str":"300000000000000000","text":"Apache Flink","source":null,"truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":1000000000,"id_str":"1000000000","name":"Apache Flink","screen_name":"Apache Flink","location":"Berlin","url":null,"description":null,"protected":false,"verified":false,"followers_count":999,"friends_count":999,"listed_count":9,"favourites_count":9999,"statuses_count":99999,"created_at":"Mon Jan 1 00:00:00 +0000 1901","utc_offset":7200,"time_zone":"Amsterdam","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":null,"profile_background_image_url_https":null,"profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fil
 l_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":null,"profile_image_url_https":null,"profile_banner_url":null,"default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"last","indices":[0,16]},{"text":"example","indices":[118,125]},{"text":"that","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,,"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"}
-{"created_at":"Mon Jan 1 00:00:00 +0000 1901","id":400000000000000000,"id_str":"400000000000000000","text":"Apache Flink","source":null,"truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":1000000000,"id_str":"1000000000","name":"Apache Flink","screen_name":"Apache Flink","location":"Berlin","url":null,"description":null,"protected":false,"verified":false,"followers_count":999,"friends_count":999,"listed_count":9,"favourites_count":9999,"statuses_count":99999,"created_at":"Mon Jan 1 00:00:00 +0000 1901","utc_offset":7200,"time_zone":"Amsterdam","geo_enabled":true,"lang":"en","contributors_enabled":false,"is_translator":false,"profile_background_color":"022330","profile_background_image_url":null,"profile_background_image_url_https":null,"profile_background_tile":true,"profile_link_color":"0084B4","profile_sidebar_border_color":"000000","profile_sidebar_fil
 l_color":"DDEEF6","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":null,"profile_image_url_https":null,"profile_banner_url":null,"default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweet_count":4,"favorite_count":1,"entities":{"hashtags":[{"text":"d12","indices":[0,16]},{"text":"how_to","indices":[118,125]}],"trends":[],"urls":[],"user_mentions":[],"symbols":[],"media":[]},"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"low","lang":"en"},"retweet_count":0,"favorite_count":0,,"favorited":false,"retweeted":false,"possibly_sensitive":false,"filter_level":"medium","lang":"en"}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/test/java/org/apache/flink/contrib/.hidden
----------------------------------------------------------------------
diff --git a/flink-contrib/src/test/java/org/apache/flink/contrib/.hidden b/flink-contrib/src/test/java/org/apache/flink/contrib/.hidden
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java b/flink-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
deleted file mode 100644
index f21e58c..0000000
--- a/flink-contrib/src/test/java/org/apache/flink/contrib/streaming/CollectITCase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming;
-
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.Test;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.streaming.DataStreamUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.junit.Assert;
-
-import java.util.Iterator;
-
-/**
- * This test verifies the behavior of DataStreamUtils.collect.
- */
-public class CollectITCase {
-
-	@Test
-	public void testCollect() {
-
-		Configuration config = new Configuration();
-		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, false);
-
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
-
-		long N = 10;
-		DataStream<Long> stream = env.generateSequence(1, N);
-
-		long i = 1;
-		for(Iterator it = DataStreamUtils.collect(stream); it.hasNext(); ) {
-			Long x = (Long) it.next();
-			if(x != i) {
-				Assert.fail(String.format("Should have got %d, got %d instead.", i, x));
-			}
-			i++;
-		}
-		if(i != N + 1) {
-			Assert.fail(String.format("Should have collected %d numbers, got %d instead.", N, i - 1));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-contrib/src/test/java/org/apache/flink/contrib/tweetinputformat/SimpleTweetInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/src/test/java/org/apache/flink/contrib/tweetinputformat/SimpleTweetInputFormatTest.java b/flink-contrib/src/test/java/org/apache/flink/contrib/tweetinputformat/SimpleTweetInputFormatTest.java
deleted file mode 100644
index 29f1fb5..0000000
--- a/flink-contrib/src/test/java/org/apache/flink/contrib/tweetinputformat/SimpleTweetInputFormatTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.tweetinputformat;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.contrib.tweetinputformat.io.SimpleTweetInputFormat;
-import org.apache.flink.contrib.tweetinputformat.model.tweet.Tweet;
-import org.apache.flink.contrib.tweetinputformat.model.tweet.entities.HashTags;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-
-public class SimpleTweetInputFormatTest {
-
-	private Tweet tweet;
-
-	private SimpleTweetInputFormat simpleTweetInputFormat;
-
-	private FileInputSplit fileInputSplit;
-
-	protected Configuration config;
-
-	protected File tempFile;
-
-
-	@Before
-	public void testSetUp() {
-
-
-		simpleTweetInputFormat = new SimpleTweetInputFormat();
-
-		File jsonFile = new File("../flink-contrib/src/main/resources/HashTagTweetSample.json");
-
-		fileInputSplit = new FileInputSplit(0, new Path(jsonFile.getPath()), 0, jsonFile.length(), new String[]{"localhost"});
-	}
-
-	@Test
-	public void testTweetInput() throws Exception {
-
-
-		simpleTweetInputFormat.open(fileInputSplit);
-		List<String> result;
-
-		int i = 0;
-		while (i < 4) {
-			i++;
-			tweet = new Tweet();
-			tweet = simpleTweetInputFormat.nextRecord(tweet);
-
-			if (tweet != null) {
-				result = new ArrayList<String>();
-				for (Iterator<HashTags> iterator = tweet.getEntities().getHashtags().iterator(); iterator.hasNext(); ) {
-					result.add(iterator.next().getText());
-				}
-
-				if (tweet.getId_str().equals("100000000000000000")) {
-					Assert.assertArrayEquals(new String[]{"example", "tweet"}, result.toArray());
-				} else if (tweet.getId_str().equals("200000000000000000")) {
-					Assert.assertArrayEquals(new String[]{"example", "tweet"}, result.toArray());
-				} else if (tweet.getId_str().equals("300000000000000000")) {
-					Assert.assertArrayEquals(new String[]{"last", "example", "that"}, result.toArray());
-				} else if (tweet.getId_str().equals("400000000000000000")) {
-					Assert.assertArrayEquals(new String[]{"d12", "how_to"}, result.toArray());
-				}
-			}
-		}
-
-		tweet = new Tweet();
-		tweet = simpleTweetInputFormat.nextRecord(tweet);
-		Assert.assertNull(tweet);
-		Assert.assertTrue(simpleTweetInputFormat.reachedEnd());
-
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-compatibility/README.md
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/README.md b/flink-staging/flink-streaming/flink-storm-compatibility/README.md
deleted file mode 100644
index 0d490a3..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/README.md
+++ /dev/null
@@ -1,15 +0,0 @@
-# flink-storm-compatibility
-
-The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.
-
-The following Strom features are not (yet/fully) supported by the compatibility layer right now:
-* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
-* topology and tuple meta information (ie, `TopologyContext` not fully supported)
-* access to tuple attributes (ie, fields) only by index (access by name is coming)
-* only default stream is supported currently (ie, only a single output stream)
-* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
-* for whole Storm topologies the following is not supported by Flink:
-  * direct emit connection pattern
-  * activating/deactivating and rebalancing of topologies
-  * task hooks
-  * custom metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/pom.xml b/flink-staging/flink-streaming/flink-storm-compatibility/pom.xml
deleted file mode 100644
index 3fcc4bc..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/pom.xml
+++ /dev/null
@@ -1,105 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-parent</artifactId>
-		<version>0.9-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-storm-compatibility</artifactId>
-	<name>flink-storm-compatibility</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.storm</groupId>
-			<artifactId>storm-core</artifactId>
-			<version>0.9.4</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>log4j-over-slf4j</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-		
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-		
-		<pluginManagement>
-			<plugins>
-				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
-				<plugin>
-					<groupId>org.eclipse.m2e</groupId>
-					<artifactId>lifecycle-mapping</artifactId>
-					<version>1.0.0</version>
-					<configuration>
-						<lifecycleMappingMetadata>
-							<pluginExecutions>
-								<pluginExecution>
-									<pluginExecutionFilter>
-										<groupId>org.apache.maven.plugins</groupId>
-										<artifactId>maven-dependency-plugin</artifactId>
-										<versionRange>[2.9,)</versionRange>
-										<goals>
-											<goal>unpack</goal>
-										</goals>
-									</pluginExecutionFilter>
-									<action>
-										<ignore/>
-									</action>
-								</pluginExecution>
-							</pluginExecutions>
-						</lifecycleMappingMetadata>
-					</configuration>
-				</plugin>
-			</plugins>
-		</pluginManagement>
-
-	</build>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
deleted file mode 100644
index 242c154..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkClient.java
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
-import backtype.storm.Config;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.NotAliveException;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.client.program.Client;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.JobManager;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
-import scala.Some;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link FlinkClient} mimics a Storm {@link NimbusClient} and {@link Nimbus}{@code .Client} at once, to interact with
- * Flink's JobManager instead of Storm's Nimbus.
- */
-public class FlinkClient {
-
-	//The jobmanager's host name
-	private final String jobManagerHost;
-	//The jobmanager's rpc port
-	private final int jobManagerPort;
-	//The user specified timeout in milliseconds
-	private final String timeout;
-
-	// The following methods are derived from "backtype.storm.utils.NimbusClient"
-
-	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
-	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 *
-	 * @param conf
-	 * 		A configuration.
-	 * @param host
-	 * 		The jobmanager's host name.
-	 * @param port
-	 * 		The jobmanager's rpc port.
-	 */
-	public FlinkClient(final String host, final int port) {
-		this(host, port, null);
-	}
-
-	/**
-	 * Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
-	 * Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
-	 *
-	 * @param conf
-	 * 		A configuration.
-	 * @param host
-	 * 		The jobmanager's host name.
-	 * @param port
-	 * 		The jobmanager's rpc port.
-	 * @param timeout
-	 * 		Timeout
-	 */
-	public FlinkClient(final String host, final int port, final Integer timeout) {
-		this.jobManagerHost = host;
-		this.jobManagerPort = port;
-		if (timeout != null) {
-			this.timeout = timeout + " ms";
-		} else {
-			this.timeout = null;
-		}
-	}
-
-	/**
-	 * Returns a {@link FlinkClient} that uses the configured {@link Config#NIMBUS_HOST} and {@link
-	 * Config#NIMBUS_THRIFT_PORT} as JobManager address.
-	 *
-	 * @param conf
-	 * 		Configuration that contains the jobmanager's hostname and port.
-	 * @return A configured {@link FlinkClient}.
-	 */
-	@SuppressWarnings("rawtypes")
-	public static FlinkClient getConfiguredClient(final Map conf) {
-		final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
-		final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
-		return new FlinkClient(nimbusHost, nimbusPort);
-	}
-
-	/**
-	 * Return a reference to itself.
-	 * <p/>
-	 * {@link FlinkClient} mimics both, {@link NimbusClient} and {@link Nimbus}{@code .Client}, at once.
-	 *
-	 * @return A reference to itself.
-	 */
-	public FlinkClient getClient() {
-		return this;
-	}
-
-	public void close() {/* nothing to do */}
-
-	// The following methods are derived from "backtype.storm.generated.Nimubs.Client"
-
-	/**
-	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
-	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
-	 */
-	public void submitTopology(final String name, final String uploadedJarLocation, final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		this.submitTopologyWithOpts(name, uploadedJarLocation, topology);
-	}
-
-	/**
-	 * Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
-	 * uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
-	 */
-	public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
-			topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-
-		if (this.getTopologyJobId(name) != null) {
-			throw new AlreadyAliveException();
-		}
-
-		final File uploadedJarFile = new File(uploadedJarLocation);
-		try {
-			JobWithJars.checkJarFile(uploadedJarFile);
-		} catch (final IOException e) {
-			throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
-		}
-
-		final List<File> jarFiles = new ArrayList<File>();
-		jarFiles.add(uploadedJarFile);
-
-		final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
-		jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));
-
-		final Configuration configuration = jobGraph.getJobConfiguration();
-
-		final Client client;
-		try {
-			client = new Client(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort), configuration,
-					JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()), -1);
-		} catch (final UnknownHostException e) {
-			throw new RuntimeException("Cannot execute job due to UnknownHostException", e);
-		}
-
-		try {
-			client.run(jobGraph, false);
-		} catch (final ProgramInvocationException e) {
-			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
-		}
-	}
-
-	public void killTopology(final String name) throws NotAliveException {
-		this.killTopologyWithOpts(name, null);
-	}
-
-	public void killTopologyWithOpts(final String name, final KillOptions options) throws NotAliveException {
-		final JobID jobId = this.getTopologyJobId(name);
-		if (jobId == null) {
-			throw new NotAliveException();
-		}
-
-		try {
-			final ActorRef jobManager = this.getJobManager();
-
-			if (options != null) {
-				try {
-					Thread.sleep(1000 * options.get_wait_secs());
-				} catch (final InterruptedException e) {
-					throw new RuntimeException(e);
-				}
-			}
-
-			final FiniteDuration askTimeout = this.getTimeout();
-			final Future<Object> response = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(askTimeout));
-			try {
-				Await.result(response, askTimeout);
-			} catch (final Exception e) {
-				throw new RuntimeException("Killing topology " + name + " with Flink job ID " + jobId + " failed", e);
-			}
-		} catch (final IOException e) {
-			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-					+ ":" + this.jobManagerPort, e);
-		}
-	}
-
-	/**
-	 * Package internal method to get a Flink {@link JobID} from a Storm topology name.
-	 *
-	 * @param id
-	 * 		The Storm topology name.
-	 * @return Flink's internally used {@link JobID}.
-	 */
-	JobID getTopologyJobId(final String id) {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-		if (this.timeout != null) {
-			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
-		}
-
-		try {
-			final ActorRef jobManager = this.getJobManager();
-
-			final FiniteDuration askTimeout = this.getTimeout();
-			final Future<Object> response = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus(),
-					new Timeout(askTimeout));
-
-			Object result;
-			try {
-				result = Await.result(response, askTimeout);
-			} catch (final Exception e) {
-				throw new RuntimeException("Could not retrieve running jobs from the JobManager", e);
-			}
-
-			if (result instanceof RunningJobsStatus) {
-				final List<JobStatusMessage> jobs = ((RunningJobsStatus) result).getStatusMessages();
-
-				for (final JobStatusMessage status : jobs) {
-					if (status.getJobName().equals(id)) {
-						return status.getJobId();
-					}
-				}
-			} else {
-				throw new RuntimeException("ReqeustRunningJobs requires a response of type "
-						+ "RunningJobs. Instead the response is of type " + result.getClass() + ".");
-			}
-		} catch (final IOException e) {
-			throw new RuntimeException("Could not connect to Flink JobManager with address " + this.jobManagerHost
-					+ ":" + this.jobManagerPort, e);
-		}
-
-		return null;
-	}
-
-	private FiniteDuration getTimeout() {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-		if (this.timeout != null) {
-			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
-		}
-
-		return AkkaUtils.getTimeout(configuration);
-	}
-
-	private ActorRef getJobManager() throws IOException {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-
-		ActorSystem actorSystem;
-		try {
-			final scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
-			actorSystem = AkkaUtils.createActorSystem(configuration, new Some<scala.Tuple2<String, Object>>(
-					systemEndpoint));
-		} catch (final Exception e) {
-			throw new RuntimeException("Could not start actor system to communicate with JobManager", e);
-		}
-
-		return JobManager.getJobManagerRemoteReference(new InetSocketAddress(this.jobManagerHost, this.jobManagerPort),
-				actorSystem, AkkaUtils.getLookupTimeout(configuration));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
deleted file mode 100644
index e82e97a..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkLocalCluster.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.LocalCluster;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.KillOptions;
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyInfo;
-import org.apache.flink.streaming.util.ClusterUtil;
-
-import java.util.Map;
-
-/**
- * {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
- */
-public class FlinkLocalCluster {
-
-	public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
-			throws Exception {
-		this.submitTopologyWithOpts(topologyName, conf, topology, null);
-	}
-
-	public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
-			final SubmitOptions submitOpts) throws Exception {
-		ClusterUtil
-				.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
-	}
-
-	public void killTopology(final String topologyName) {
-		this.killTopologyWithOpts(topologyName, null);
-	}
-
-	public void killTopologyWithOpts(final String name, final KillOptions options) {
-	}
-
-	public void activate(final String topologyName) {
-	}
-
-	public void deactivate(final String topologyName) {
-	}
-
-	public void rebalance(final String name, final RebalanceOptions options) {
-	}
-
-	public void shutdown() {
-		ClusterUtil.stopOnMiniCluster();
-	}
-
-	public String getTopologyConf(final String id) {
-		return null;
-	}
-
-	public StormTopology getTopology(final String id) {
-		return null;
-	}
-
-	public ClusterSummary getClusterInfo() {
-		return null;
-	}
-
-	public TopologyInfo getTopologyInfo(final String id) {
-		return null;
-	}
-
-	public Map<?, ?> getState() {
-		return null;
-	}
-
-	// A different {@link FlinkLocalCluster} to be used for execution of ITCases
-	private static FlinkLocalCluster currentCluster = null;
-
-	/**
-	 * Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by {@link
-	 * #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
-	 *
-	 * @return a {@link FlinkLocalCluster} to be used for execution
-	 */
-	public static FlinkLocalCluster getLocalCluster() {
-		if (currentCluster == null) {
-			currentCluster = new FlinkLocalCluster();
-		}
-
-		return currentCluster;
-	}
-
-	/**
-	 * Sets a different {@link FlinkLocalCluster} to be used for execution.
-	 *
-	 * @param cluster
-	 * 		the {@link FlinkLocalCluster} to be used for execution
-	 */
-	public static void initialize(final FlinkLocalCluster cluster) {
-		currentCluster = cluster;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
deleted file mode 100644
index d6a0230..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.util.List;
-
-/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
- * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt
- * bolt}.<br />
- * <br />
- * <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore,
- * direct emit is not supported.</strong>
- */
-final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
-	// the declared output schema
-	private Fields outputSchema;
-
-	@Override
-	public void declare(final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code direct} is {@code true}
-	 */
-	@Override
-	public void declare(final boolean direct, final Fields fields) {
-		this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
-	 * {@link Utils#DEFAULT_STREAM_ID}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
-	 */
-	@Override
-	public void declareStream(final String streamId, final Fields fields) {
-		this.declareStream(streamId, false, fields);
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * <p/>
-	 * Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
-	 * {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
-	 * must be {@code false}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
-	 */
-	@Override
-	public void declareStream(final String streamId, final boolean direct, final Fields fields) {
-		if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
-			throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
-		}
-		if (direct) {
-			throw new UnsupportedOperationException("Direct emit is not supported by Flink");
-		}
-
-		this.outputSchema = fields;
-	}
-
-	/**
-	 * Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
-	 * {@code null} is returned.
-	 *
-	 * @return output type information for the declared output schema; or {@code null} if no output schema was declared
-	 * @throws IllegalArgumentException
-	 * 		if more then 25 attributes are declared
-	 */
-	public TypeInformation<?> getOutputType() throws IllegalArgumentException {
-		if ((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
-			return null;
-		}
-
-		Tuple t;
-		final int numberOfAttributes = this.outputSchema.size();
-
-		if (numberOfAttributes == 1) {
-			return TypeExtractor.getForClass(Object.class);
-		} else if (numberOfAttributes <= 25) {
-			try {
-				t = Tuple.getTupleClass(numberOfAttributes).newInstance();
-			} catch (final InstantiationException e) {
-				throw new RuntimeException(e);
-			} catch (final IllegalAccessException e) {
-				throw new RuntimeException(e);
-			}
-		} else {
-			throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
-		}
-
-		// TODO: declare only key fields as DefaultComparable
-		for (int i = 0; i < numberOfAttributes; ++i) {
-			t.setField(new DefaultComparable(), i);
-		}
-
-		return TypeExtractor.getForObject(t);
-	}
-
-	/**
-	 * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
-	 * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
-	 * Flink cannot use them and will throw an exception.
-	 */
-	private static class DefaultComparable implements Comparable<DefaultComparable> {
-
-		public DefaultComparable() {
-		}
-
-		@Override
-		public int compareTo(final DefaultComparable o) {
-			return 0;
-		}
-	}
-
-	/**
-	 * Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
-	 *
-	 * @return array of {@code int}s that contains the index without the output schema for each attribute in the given
-	 * list
-	 */
-	public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
-		final int[] fieldIndexes = new int[groupingFields.size()];
-
-		for (int i = 0; i < fieldIndexes.length; ++i) {
-			fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
-		}
-
-		return fieldIndexes;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
deleted file mode 100644
index c86ee8a..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkSubmitter.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Map;
-
-/**
- * {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
- */
-public class FlinkSubmitter {
-	public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @param opts
-	 * 		to manipulate the starting of the topology.
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
-			final SubmitOptions opts)
-			throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology);
-	}
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or until explicitly killed. The given {@link
-	 * FlinkProgressListener} is ignored because progress bars are not supported by Flink.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @param opts
-	 * 		to manipulate the starting of the topology
-	 * @param progressListener
-	 * 		to track the progress of the jar upload process
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
-	public static void submitTopology(final String name, final Map stormConf, final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		if (!Utils.isValidConf(stormConf)) {
-			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
-		}
-
-		final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
-		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
-			stormConf.put(Config.NIMBUS_HOST,
-					flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));
-		}
-		if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
-			stormConf.put(Config.NIMBUS_THRIFT_PORT,
-					new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
-							6123)));
-		}
-
-		final String serConf = JSONValue.toJSONString(stormConf);
-
-		final FlinkClient client = FlinkClient.getConfiguredClient(stormConf);
-		if (client.getTopologyJobId(name) != null) {
-			throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
-		}
-		String localJar = System.getProperty("storm.jar");
-		if (localJar == null) {
-			try {
-				for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
-						.getJars()) {
-					// TODO verify that there is onnly one jar
-					localJar = file.getAbsolutePath();
-				}
-			} catch (final ClassCastException e) {
-				// ignore
-			}
-		}
-		try {
-			logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
-			client.submitTopologyWithOpts(name, localJar, topology);
-		} catch (final InvalidTopologyException e) {
-			logger.warn("Topology submission exception: " + e.get_msg());
-			throw e;
-		} catch (final AlreadyAliveException e) {
-			logger.warn("Topology already alive exception", e);
-			throw e;
-		} finally {
-			client.close();
-		}
-
-		logger.info("Finished submitting topology: " + name);
-	}
-
-	/**
-	 * Same as {@link #submitTopology(String, Map, FlinkTopology, SubmitOptions)}. Progress bars are not supported by
-	 * Flink.
-	 *
-	 * @param name
-	 * 		the name of the storm.
-	 * @param stormConf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 * 		the processing to execute.
-	 * @param opts
-	 * 		to manipulate the starting of the topology
-	 * @throws AlreadyAliveException
-	 * 		if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 * 		if an invalid topology was submitted
-	 */
-	public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
-			final FlinkTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		submitTopology(name, stormConf, topology);
-	}
-
-	/**
-	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
-	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
-	 * environment.
-	 *
-	 * @param conf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param localJar
-	 * 		file path of the jar file to submit
-	 * @return the value of parameter localJar
-	 */
-	@SuppressWarnings("rawtypes")
-	public static String submitJar(final Map conf, final String localJar) {
-		return submitJar(localJar);
-	}
-
-	/**
-	 * In Flink, jar files are submitted directly when a program is started. Thus, this method does nothing. The
-	 * returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
-	 * environment.
-	 *
-	 * @param conf
-	 * 		the topology-specific configuration. See {@link Config}.
-	 * @param localJar
-	 * 		file path of the jar file to submit
-	 * @param listener
-	 * 		progress listener to track the jar file upload
-	 * @return the value of parameter localJar
-	 */
-	public static String submitJar(final String localJar) {
-		if (localJar == null) {
-			throw new RuntimeException(
-					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
-							"to upload");
-		}
-
-		return localJar;
-	}
-
-	/**
-	 * Dummy interface use to track progress of file upload. Does not do anything. Kept for compatibility.
-	 */
-	public interface FlinkProgressListener {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/12b13f9c/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java b/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
deleted file mode 100644
index 4b7f0dc..0000000
--- a/flink-staging/flink-streaming/flink-storm-compatibility/src/main/java/org/apache/flink/stormcompatibility/api/FlinkTopology.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.api;
-
-import backtype.storm.generated.StormTopology;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/**
- * {@link FlinkTopology} mimics a {@link StormTopology} and is implemented in terms of a {@link
- * StreamExecutionEnvironment} . In contrast to a regular {@link StreamExecutionEnvironment}, a {@link FlinkTopology}
- * cannot be executed directly, but must be handed over to a {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or
- * {@link FlinkClient}.
- */
-class FlinkTopology extends StreamExecutionEnvironment {
-
-	// The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology}
-	private final StormTopology stormTopology;
-	// The number of declared tasks for the whole program (ie, sum over all dops)
-	private int numberOfTasks = 0;
-
-	public FlinkTopology(final StormTopology stormTopology) {
-		// Set default parallelism to 1, to mirror Storm default behavior
-		super.setParallelism(1);
-		this.stormTopology = stormTopology;
-	}
-
-	/**
-	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter}, or {@link
-	 * FlinkClient}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public JobExecutionResult execute() throws Exception {
-		throw new UnsupportedOperationException(
-				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-						"instead.");
-	}
-
-	/**
-	 * Is not supported. In order to execute use {@link FlinkLocalCluster}, {@link FlinkSubmitter} or {@link
-	 * FlinkClient}.
-	 *
-	 * @throws UnsupportedOperationException
-	 * 		at every invocation
-	 */
-	@Override
-	public JobExecutionResult execute(final String jobName) throws Exception {
-		throw new UnsupportedOperationException(
-				"A FlinkTopology cannot be executed directly. Use FlinkLocalCluster, FlinkSubmitter, or FlinkClient " +
-						"instead.");
-	}
-
-	//TODO
-	public String getStormTopologyAsString() {
-		return this.stormTopology.toString();
-	}
-
-	/**
-	 * Increased the number of declared tasks of this program by the given value.
-	 *
-	 * @param dop
-	 * 		The dop of a new operator that increases the number of overall tasks.
-	 */
-	public void increaseNumberOfTasks(final int dop) {
-		assert (dop > 0);
-		this.numberOfTasks += dop;
-	}
-
-	/**
-	 * Return the number or required tasks to execute this program.
-	 *
-	 * @return the number or required tasks to execute this program
-	 */
-	public int getNumberOfTasks() {
-		return this.numberOfTasks;
-	}
-
-}