You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:23 UTC

[07/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/pom.xml b/flink-streaming-connectors/flink-connector-twitter/pom.xml
new file mode 100644
index 0000000..df963a3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-connectors-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-connector-twitter</artifactId>
+	<name>flink-connector-twitter</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>com.twitter</groupId>
+			<artifactId>hbc-core</artifactId>
+			<version>2.2.0</version>
+		</dependency>
+		
+		<dependency>
+			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.commons.json</artifactId>
+			<version>2.0.6</version>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<configuration>
+							<artifactSet>
+								<includes combine.children="append">
+									<!-- We include all dependencies that transitively depend on guava -->
+									<include>com.twitter:hbc-core</include>
+									<include>com.twitter:joauth</include>
+								</includes>
+							</artifactSet>
+							<transformers>
+								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+							</transformers>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
new file mode 100644
index 0000000..0f16541
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
+ * 
+ * @param <IN>
+ *            Type of the input elements.
+ * @param <OUT>
+ *            Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	// private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
+
+	/**
+	 * Get the value object associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public Object get(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).get("retValue");
+	}
+
+	/**
+	 * Get the boolean value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public boolean getBoolean(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getBoolean("retValue");
+	}
+
+	/**
+	 * Get the double value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public double getDouble(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getDouble("retValue");
+	}
+
+	/**
+	 * Get the int value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public int getInt(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getInt("retValue");
+	}
+
+	/**
+	 * Get the long value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public long getLong(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+
+		return parser.parse(field).getLong("retValue");
+	}
+	
+	/**
+	 * Get the String value associated with a key form a JSON code. It can find
+	 * embedded fields, too.
+	 * 
+	 * @param jsonText
+	 *            JSON String in which the field is searched.
+	 * @param field
+	 *            The key whose value is searched for.
+	 * @return The object associated with the field.
+	 * @throws JSONException
+	 *             If the field is not found.
+	 */
+	public String getString(String jsonText, String field) throws JSONException {
+		JSONParser parser = new JSONParser(jsonText);
+		
+		return parser.parse(field).getString("retValue");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
new file mode 100644
index 0000000..c1eabbd
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * A JSONParser contains a JSONObject and provides opportunity to access
+ * embedded fields in JSON code.
+ */
+public class JSONParser {
+
+	private JSONObject originalJO;
+	private String searchedfield;
+	private Object temp;
+
+	/**
+	 * Construct a JSONParser from a string. The string has to be a JSON code
+	 * from which we want to get a field.
+	 * 
+	 * @param jsonText
+	 *            A string which contains a JSON code. String representation of
+	 *            a JSON code.
+	 * @throws JSONException
+	 *             If there is a syntax error in the source string.
+	 */
+	public JSONParser(String jsonText) throws JSONException {
+		originalJO = new JSONObject(jsonText);
+	}
+
+	/**
+	 * 
+	 * Parse the JSON code passed to the constructor to find the given key.
+	 * 
+	 * @param key
+	 *            The key whose value is searched for.
+	 * @return A JSONObject which has only one field called "retValue" and the
+	 *         value associated to it is the searched value. The methods of
+	 *         JSONObject can be used to get the field value in a desired
+	 *         format.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	public JSONObject parse(String key) throws JSONException {
+		initializeParser(key);
+		parsing();
+		return putResultInJSONObj();
+	}
+
+	/**
+	 * Prepare the fields of the class for the parsing
+	 * 
+	 * @param key
+	 *            The key whose value is searched for.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void initializeParser(String key) throws JSONException {
+		searchedfield = key;
+		temp = new JSONObject(originalJO.toString());
+	}
+
+	/**
+	 * This function goes through the given field and calls the appropriate
+	 * functions to treat the units between the punctuation marks.
+	 * 
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void parsing() throws JSONException {
+		StringTokenizer st = new StringTokenizer(searchedfield, ".");
+		while (st.hasMoreTokens()) {
+			find(st.nextToken());
+		}
+	}
+
+	/**
+	 * Search for the next part of the field and update the state if it was
+	 * found.
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @throws JSONException
+	 *             If the key is not found.
+	 */
+	private void find(String nextToken) throws JSONException {
+		if (endsWithBracket(nextToken)) {
+			treatAllBracket(nextToken);
+		} else {
+			temp = ((JSONObject) temp).get(nextToken);
+		}
+	}
+
+	/**
+	 * Determine whether the given string ends with a closing square bracket ']'
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @return True if the given string ends with a closing square bracket ']'
+	 *         and false otherwise.
+	 */
+	private boolean endsWithBracket(String nextToken) {
+		return nextToken.substring(nextToken.length() - 1).endsWith("]");
+	}
+
+	/**
+	 * Handle (multidimensional) arrays. Treat the square bracket pairs one
+	 * after the other if necessary.
+	 * 
+	 * @param nextToken
+	 *            The current part of the searched field.
+	 * @throws JSONException
+	 *             If the searched element is not found.
+	 */
+	private void treatAllBracket(String nextToken) throws JSONException {
+		List<String> list = Arrays.asList(nextToken.split("\\["));
+		ListIterator<String> iter = list.listIterator();
+
+		temp = ((JSONObject) temp).get(iter.next());
+
+		while (iter.hasNext()) {
+			int index = Integer.parseInt(cutBracket(iter.next()));
+			temp = ((JSONArray) temp).get(index);
+		}
+	}
+
+	/**
+	 * Remove the last character of the string.
+	 * 
+	 * @param string
+	 *            String to modify.
+	 * @return The given string without the last character.
+	 */
+	private String cutBracket(String string) {
+		return string.substring(0, string.length() - 1);
+	}
+
+	/**
+	 * Save the result of the search into a JSONObject.
+	 * 
+	 * @return A special JSONObject which contain only one key. The value
+	 *         associated to this key is the result of the search.
+	 * @throws JSONException
+	 *             If there is a problem creating the JSONObject. (e.g. invalid
+	 *             syntax)
+	 */
+	private JSONObject putResultInJSONObj() throws JSONException {
+		JSONObject jo = new JSONObject();
+		jo.put("retValue", temp);
+		return jo;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
new file mode 100644
index 0000000..8dd4458
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.core.endpoint.Location;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.httpclient.auth.Authentication;
+
+/**
+ * 
+ * An extension of {@link TwitterSource} by filter parameters. This extension
+ * enables to filter the twitter stream by user defined parameters.
+ */
+public class TwitterFilterSource extends TwitterSource {
+
+	private static final Logger LOG = LoggerFactory
+			.getLogger(TwitterFilterSource.class);
+
+	private static final long serialVersionUID = 1L;
+
+	private List<String> trackTerms = new LinkedList<String>();
+
+	private List<String> languages = new LinkedList<String>();
+
+	private List<Long> followings = new LinkedList<Long>();
+
+	private List<Location> locations = new LinkedList<Location>();
+
+	private Map<String, String> queryParameters = new HashMap<String, String>();
+
+	private Map<String, String> postParameters = new HashMap<String, String>();
+
+	public TwitterFilterSource(String authPath) {
+		super(authPath);
+	}
+
+	@Override
+	protected void initializeConnection() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initializing Twitter Streaming API connection");
+		}
+		queue = new LinkedBlockingQueue<String>(queueSize);
+
+		StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
+		configEndpoint(endpoint);
+		endpoint.stallWarnings(false);
+
+		Authentication auth = authenticate();
+
+		initializeClient(endpoint, auth);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Twitter Streaming API connection established successfully");
+		}
+	}
+
+	/**
+	 * This function configures the streaming endpoint
+	 * 
+	 * @param endpoint
+	 *            The streaming endpoint
+	 */
+	private void configEndpoint(StatusesFilterEndpoint endpoint) {
+		if (!trackTerms.isEmpty()) {
+			endpoint.trackTerms(trackTerms);
+		}
+		if (!languages.isEmpty()) {
+			endpoint.languages(languages);
+		}
+		if (!followings.isEmpty()) {
+			endpoint.followings(followings);
+		}
+		if (!locations.isEmpty()) {
+			endpoint.locations(locations);
+		}
+		if (!queryParameters.isEmpty()) {
+			for (Entry<String, String> entry : queryParameters.entrySet()) {
+				endpoint.addQueryParameter(entry.getKey(), entry.getValue());
+			}
+		}
+		if (!postParameters.isEmpty()) {
+			for (Entry<String, String> entry : postParameters.entrySet()) {
+				endpoint.addPostParameter(entry.getKey(), entry.getValue());
+			}
+		}
+	}
+
+	/**
+	 * This function sets which term to track.
+	 * 
+	 * @param term
+	 *            The term to track.
+	 */
+	public void trackTerm(String term) {
+		this.trackTerms.add(term);
+	}
+
+	/**
+	 * This function sets which terms to track.
+	 * 
+	 * @param terms
+	 *            The terms to track.
+	 */
+	public void trackTerms(Collection<String> terms) {
+		this.trackTerms.addAll(terms);
+	}
+
+	/**
+	 * This function tells which terms are tracked.
+	 */
+	public List<String> getTrackTerms() {
+		return this.trackTerms;
+	}
+
+	/**
+	 * This function sets which language to filter.
+	 * 
+	 * @param language
+	 *            The language to filter.
+	 */
+	public void filterLanguage(String language) {
+		this.languages.add(language);
+	}
+
+	/**
+	 * This function sets which languages to filter.
+	 * 
+	 * @param languages
+	 *            The languages to filter.
+	 */
+	public void filterLanguages(Collection<String> languages) {
+		this.languages.addAll(languages);
+	}
+
+	/**
+	 * This function tells which languages are filtered.
+	 */
+	public List<String> getLanguages() {
+		return this.languages;
+	}
+
+	/**
+	 * This function sets which user to follow.
+	 * 
+	 * @param userID
+	 *            The ID of the user to follow.
+	 */
+	public void filterFollowings(Long userID) {
+		this.followings.add(userID);
+	}
+
+	/**
+	 * This function sets which users to follow.
+	 * 
+	 * @param userIDs
+	 *            The IDs of the users to follow.
+	 */
+	public void filterFollowings(Collection<Long> userIDs) {
+		this.followings.addAll(userIDs);
+	}
+
+	/**
+	 * This function tells which users are followed.
+	 */
+	public List<Long> getFollowings() {
+		return this.followings;
+	}
+
+	/**
+	 * This function sets which location to filter.
+	 * 
+	 * @param location
+	 *            The location to filter.
+	 */
+	public void filterLocation(Location location) {
+		this.locations.add(location);
+	}
+
+	/**
+	 * This function sets which locations to filter.
+	 * 
+	 * @param locations
+	 *            The locations to filter.
+	 */
+	public void filterLocations(Collection<Location> locations) {
+		this.locations.addAll(locations);
+	}
+
+	/**
+	 * This function tells which locations are filtered.
+	 */
+	public List<Location> getLocations() {
+		return this.locations;
+	}
+
+	/**
+	 * This function sets a query parameter.
+	 * 
+	 * @param parameter
+	 *            The name of the query parameter.
+	 * @param value
+	 *            The value of the query parameter.
+	 */
+	public void addQueryParameter(String parameter, String value) {
+		this.queryParameters.put(parameter, value);
+	}
+
+	/**
+	 * This function sets query parameters.
+	 * 
+	 * @param queryParameters
+	 *            The query parameters for the endpoint.
+	 */
+	public void addQueryParameters(Map<String, String> queryParameters) {
+		this.queryParameters.putAll(queryParameters);
+	}
+
+	/**
+	 * This function tells which query parameters are used by the endpoint.
+	 */
+	public Map<String, String> getQueryParameters() {
+		return this.queryParameters;
+	}
+
+	/**
+	 * This function sets a post parameter.
+	 * 
+	 * @param parameter
+	 *            The name of the post parameter.
+	 * @param value
+	 *            The value of the post parameter.
+	 */
+	public void addPostParameter(String parameter, String value) {
+		this.postParameters.put(parameter, value);
+	}
+
+	/**
+	 * This function sets post parameters.
+	 * 
+	 * @param postParameters
+	 *              The post parameters for the endpoint.
+	 */
+	public void addPostParameters(Map<String, String> postParameters) {
+		this.postParameters.putAll(postParameters);
+	}
+
+	/**
+	 * This function tells which post parameters are used by the endpoint.
+	 */
+	public Map<String, String> postParameters() {
+		return this.postParameters;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
new file mode 100644
index 0000000..43cb179
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an example how to use TwitterFilterSource. Before executing the
+ * example you have to define the access keys of twitter.properties in the
+ * resource folder. The access keys can be found in your twitter account.
+ */
+public class TwitterFilterSourceExample {
+
+	/**
+	 * path to the twitter properties
+	 */
+	private static final String PATH_TO_AUTH_FILE = "/twitter.properties";
+
+	public static void main(String[] args) {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.getExecutionEnvironment();
+
+		TwitterFilterSource twitterSource = new TwitterFilterSource(
+				TwitterFilterSourceExample.class.getResource(PATH_TO_AUTH_FILE)
+						.getFile());
+
+		twitterSource.trackTerm("obama");
+		twitterSource.filterLanguage("en");
+
+		DataStream<String> streamSource = env.addSource(twitterSource).flatMap(
+				new JSONParseFlatMap<String, String>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public void flatMap(String s, Collector<String> c)
+							throws Exception {
+						c.collect(s);
+					}
+				});
+
+		streamSource.print();
+
+		try {
+			env.execute("Twitter Streaming Test");
+		} catch (Exception e) {
+			// TODO Auto-generated catch block
+			e.printStackTrace();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..bad0f8c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. This is not a parallel source because the Twitter API only allows
+ * two concurrent connections.
+ */
+public class TwitterSource extends RichSourceFunction<String> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
+
+	private static final long serialVersionUID = 1L;
+	private String authPath;
+	protected transient BlockingQueue<String> queue;
+	protected int queueSize = 10000;
+	private transient BasicClient client;
+	private int waitSec = 5;
+
+	private int maxNumberOfTweets;
+	private int currentNumberOfTweets;
+
+	private transient volatile boolean isRunning;
+
+	/**
+	 * Create {@link TwitterSource} for streaming
+	 * 
+	 * @param authPath
+	 *            Location of the properties file containing the required
+	 *            authentication information.
+	 */
+	public TwitterSource(String authPath) {
+		this.authPath = authPath;
+		maxNumberOfTweets = -1;
+	}
+
+	/**
+	 * Create {@link TwitterSource} to collect finite number of tweets
+	 * 
+	 * @param authPath
+	 *            Location of the properties file containing the required
+	 *            authentication information.
+	 * @param numberOfTweets
+	 * 
+	 */
+	public TwitterSource(String authPath, int numberOfTweets) {
+		this.authPath = authPath;
+		this.maxNumberOfTweets = numberOfTweets;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		initializeConnection();
+		currentNumberOfTweets = 0;
+		isRunning = true;
+	}
+
+	/**
+	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
+	 */
+	protected void initializeConnection() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initializing Twitter Streaming API connection");
+		}
+
+		queue = new LinkedBlockingQueue<String>(queueSize);
+
+		StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+		endpoint.stallWarnings(false);
+
+		Authentication auth = authenticate();
+
+		initializeClient(endpoint, auth);
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Twitter Streaming API connection established successfully");
+		}
+	}
+
+	protected OAuth1 authenticate() {
+
+		Properties authenticationProperties = loadAuthenticationProperties();
+		
+		return new OAuth1(authenticationProperties.getProperty("consumerKey"),
+				authenticationProperties.getProperty("consumerSecret"),
+				authenticationProperties.getProperty("token"),
+				authenticationProperties.getProperty("secret"));
+	}
+
+	/**
+	 * Reads the given properties file for the authentication data.
+	 * 
+	 * @return the authentication data.
+	 */
+	private Properties loadAuthenticationProperties() {
+		
+		Properties properties = new Properties();
+		try {
+			InputStream input = new FileInputStream(authPath);
+			properties.load(input);
+			input.close();
+		} catch (Exception e) {
+			throw new RuntimeException("Cannot open .properties file: " + authPath, e);
+		}
+		return properties;
+	}
+
+	protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) {
+
+		client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
+				.endpoint(endpoint).authentication(auth)
+				.processor(new StringDelimitedProcessor(queue)).build();
+		
+		client.connect();
+	}
+
+	@Override
+	public void close() {
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initiating connection close");
+		}
+
+		if (client != null) {
+			client.stop();
+		}
+
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Connection closed successfully");
+		}
+	}
+
+	/**
+	 * Get the size of the queue in which the tweets are contained temporarily.
+	 * 
+	 * @return the size of the queue in which the tweets are contained
+	 *         temporarily
+	 */
+	public int getQueueSize() {
+		return queueSize;
+	}
+
+	/**
+	 * Set the size of the queue in which the tweets are contained temporarily.
+	 * 
+	 * @param queueSize
+	 *            The desired value.
+	 */
+	public void setQueueSize(int queueSize) {
+		this.queueSize = queueSize;
+	}
+
+	/**
+	 * This function tells how long TwitterSource waits for the tweets.
+	 * 
+	 * @return Number of second.
+	 */
+	public int getWaitSec() {
+		return waitSec;
+	}
+
+	/**
+	 * This function sets how long TwitterSource should wait for the tweets.
+	 * 
+	 * @param waitSec
+	 *            The desired value.
+	 */
+	public void setWaitSec(int waitSec) {
+		this.waitSec = waitSec;
+	}
+
+	@Override
+	public void run(SourceContext<String> ctx) throws Exception {
+		while (isRunning) {
+			if (client.isDone()) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
+							.getMessage());
+				}
+				break;
+			}
+
+			ctx.collect(queue.take());
+
+			if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) {
+				break;
+			}
+		}
+	}
+
+	@Override
+	public void cancel() {
+		isRunning = false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
new file mode 100644
index 0000000..a80c32a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -0,0 +1,99 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwitterStreaming {
+
+	private static final int PARALLELISM = 1;
+	private static final int SOURCE_PARALLELISM = 1;
+	private static final int NUMBEROFTWEETS = 100;
+
+	private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
+
+	public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void invoke(Tuple5<Long, Integer, String, String, String> tuple) {
+			System.out.println("ID: " + tuple.f0 + " int: " + tuple.f1 + " LANGUAGE: " + tuple.f2);
+			System.out.println("NAME: " + tuple.f4);
+			System.out.println("TEXT: " + tuple.f3);
+			System.out.println("");
+		}
+
+	}
+
+	public static class SelectDataFlatMap extends
+			JSONParseFlatMap<String, Tuple5<Long, Integer, String, String, String>> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(String value, Collector<Tuple5<Long, Integer, String, String, String>> out)
+				throws Exception {
+			try {
+				out.collect(new Tuple5<Long, Integer, String, String, String>(
+						getLong(value, "id"),
+						getInt(value, "entities.hashtags[0].indices[1]"),
+						getString(value, "lang"),
+						getString(value, "text"),
+						getString(value, "user.name")));
+			} catch (JSONException e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Field not found");
+				}
+			}
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		String path = new String();
+
+		if (args != null && args.length == 1) {
+			path = args[0];
+		} else {
+			System.err.println("USAGE:\nTwitterStreaming <pathToPropertiesFile>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment
+				.createLocalEnvironment(PARALLELISM);
+
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+				.setParallelism(SOURCE_PARALLELISM);
+
+		DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
+				.flatMap(new SelectDataFlatMap());
+
+		selectedDataStream.addSink(new TwitterSink());
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
new file mode 100644
index 0000000..b1fc92c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+* This program demonstrate the use of TwitterSource.
+* Its aim is to count the frequency of the languages of tweets
+*/
+public class TwitterTopology {
+
+	private static final int NUMBEROFTWEETS = 100;
+
+	/**
+	 * FlatMapFunction to determine the language of tweets if possible
+	 */
+	public static class SelectLanguageFlatMap extends
+			JSONParseFlatMap<String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			try{
+				out.collect(getString(value, "lang"));
+			}
+			catch (JSONException e){
+				out.collect("");
+			}
+		}
+
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		String path = new String();
+
+		if (args != null && args.length == 1) {
+			path = args[0];
+		} else {
+			System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
+
+
+		DataStream<Tuple2<String, Integer>> dataStream = streamSource
+				.flatMap(new SelectLanguageFlatMap())
+				.map(new MapFunction<String, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+
+					@Override
+					public Tuple2<String, Integer> map(String value) throws Exception {
+						return new Tuple2<String, Integer>(value, 1);
+					}
+				})
+				.keyBy(0)
+				.sum(1);
+
+		dataStream.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties b/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
new file mode 100644
index 0000000..1ca4143
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+secret=***
+consumerSecret=***
+token=***-***
+consumerKey=***

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
new file mode 100644
index 0000000..b1d4115
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class JSONParserTest {
+
+	private String jsonText;
+	private String searchedField;
+
+	public JSONParserTest(String text, String field) {
+		jsonText = text;
+		searchedField = field;
+	}
+
+	@Parameters
+	public static Collection<Object[]> initParameterList() {
+
+		Object[][] parameterList = new Object[][] { 
+				{ "{\"key\":\"value\"}", 							"key" },
+				{ "{\"key\":[\"value\"]}", 							"key[0]" },
+				{ "{\"key\":[{\"key\":\"value\"}]}", 				"key[0].key" },
+				{ "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", 	"key[0].key[0].key"},
+				{ "{\"key\":[1,[{\"key\":\"value\"}]]}", 			"key[1][0].key" },
+				{ "{\"key\":[1,[[\"key\",2,\"value\"]]]}", 			"key[1][0][2]" },
+				{ "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
+				};
+
+		return Arrays.asList(parameterList);
+	}
+
+	@Test
+	public void test() {
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+			String expected = "{\"retValue\":\"value\"}";
+
+			assertTrue(expected.equals(jo.toString()));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
new file mode 100644
index 0000000..8851086
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+
+
+public class JSONParserTest2 {
+	
+	@Test
+	public void testGetBooleanFunction() {
+		String jsonText = "{\"key\":true}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertTrue(jo.getBoolean("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetDoubleFunction() {
+		double expected = 12345.12345;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getDouble("retValue"),0.000001);
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetIntFunction() {
+		int expected = 15;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getInt("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+
+	@Test
+	public void testGetLongFunction() {
+		long expected = 111111111111L;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getLong("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9ede613
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml
new file mode 100644
index 0000000..dc395b2
--- /dev/null
+++ b/flink-streaming-connectors/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-streaming-connectors-parent</artifactId>
+	<name>flink-streaming-connectors</name>
+
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>flink-connector-flume</module>
+		<module>flink-connector-kafka</module>
+		<module>flink-connector-elasticsearch</module>
+		<module>flink-connector-rabbitmq</module>
+		<module>flink-connector-twitter</module>
+		<module>flink-connector-nifi</module>
+	</modules>
+
+	<!-- See main pom.xml for explanation of profiles -->
+	<profiles>
+		<profile>
+			<id>hadoop-2</id>
+			<activation>
+				<property>
+					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+					<!--hadoop2--><name>!hadoop.profile</name>
+				</property>
+			</activation>
+			<modules>
+				<!-- Include the flink-fs-tests project only for HD2.
+				 	The HDFS minicluster interfaces changed between the two versions.
+				 -->
+				<module>flink-connector-filesystem</module>
+			</modules>
+		</profile>
+	</profiles>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/pom.xml b/flink-streaming-examples/pom.xml
new file mode 100644
index 0000000..5e06411
--- /dev/null
+++ b/flink-streaming-examples/pom.xml
@@ -0,0 +1,535 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-parent</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-streaming-examples</artifactId>
+	<name>flink-streaming-examples</name>
+
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-twitter</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-tests</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<!-- get default data from flink-java-examples package -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<version>2.9</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<execution>
+						<id>unpack</id>
+						<phase>prepare-package</phase>
+						<goals>
+							<goal>unpack</goal>
+						</goals>
+						<configuration>
+							<artifactItems>
+								<!-- For WordCount example data -->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-java-examples</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+								</artifactItem>
+								<!-- For JSON utilities -->
+								<artifactItem>
+									<groupId>org.apache.flink</groupId>
+									<artifactId>flink-connector-twitter</artifactId>
+									<version>${project.version}</version>
+									<type>jar</type>
+									<overWrite>false</overWrite>
+									<outputDirectory>${project.build.directory}/classes</outputDirectory>
+									<includes>org/apache/flink/streaming/connectors/json/*</includes>
+								</artifactItem>
+							</artifactItems>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<!-- self-contained jars for each example -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
+				<executions>
+					<!-- Default Execution -->
+					<execution>
+						<id>default</id>
+						<phase>package</phase>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+					
+					<!-- Iteration -->
+					<execution>
+						<id>Iteration</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Iteration</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/iteration/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- IncrementalLearning -->
+					<execution>
+						<id>IncrementalLearning</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>IncrementalLearning</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/ml/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- Twitter -->
+					<execution>
+						<id>Twitter</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>Twitter</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/twitter/*.class</include>
+								<include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
+								<include>org/apache/flink/streaming/connectors/json/*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WindowJoin -->
+					<execution>
+						<id>WindowJoin</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WindowJoin</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/join/*.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCountPOJO -->
+					<execution>
+						<id>WordCountPOJO</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCountPOJO</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>			
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WordCount -->
+					<execution>
+						<id>WordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>				
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- WindowWordCount -->
+					<execution>
+						<id>WindowWordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>WindowWordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+								<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SocketTextStreamWordCount -->
+					<execution>
+						<id>SocketTextStreamWordCount</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SocketTextStreamWordCount</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+								<include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- TopSpeedWindowing -->
+					<execution>
+						<id>TopSpeedWindowing</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>TopSpeedWindowing</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.TopSpeedWindowing</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<!-- SessionWindowing -->
+					<execution>
+						<id>SessionWindowing</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+						<configuration>
+							<classifier>SessionWindowing</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.streaming.examples.windowing.SessionWindowing</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/streaming/examples/windowing/SessionWindowing.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/SessionWindowing$*.class</include>
+							</includes>
+						</configuration>
+					</execution>
+
+				</executions>
+			</plugin>
+
+
+			<!-- Scala Compiler -->
+			<plugin>
+				<groupId>net.alchim31.maven</groupId>
+				<artifactId>scala-maven-plugin</artifactId>
+				<version>3.1.4</version>
+				<executions>
+					<!-- Run scala compiler in the process-resources phase, so that dependencies on
+						scala classes can be resolved later in the (Java) compile phase -->
+					<execution>
+						<id>scala-compile-first</id>
+						<phase>process-resources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+					</execution>
+ 
+					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+						 scala classes can be resolved later in the (Java) test-compile phase -->
+					<execution>
+						<id>scala-test-compile</id>
+						<phase>process-test-resources</phase>
+						<goals>
+							<goal>testCompile</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<jvmArgs>
+						<jvmArg>-Xms128m</jvmArg>
+						<jvmArg>-Xmx512m</jvmArg>
+					</jvmArgs>
+				</configuration>
+			</plugin>
+			
+			<!-- Eclipse Integration -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-eclipse-plugin</artifactId>
+				<version>2.8</version>
+				<configuration>
+					<downloadSources>true</downloadSources>
+					<projectnatures>
+						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
+					</projectnatures>
+					<buildcommands>
+						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+					</buildcommands>
+					<classpathContainers>
+						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+					</classpathContainers>
+					<excludes>
+						<exclude>org.scala-lang:scala-library</exclude>
+						<exclude>org.scala-lang:scala-compiler</exclude>
+					</excludes>
+					<sourceIncludes>
+						<sourceInclude>**/*.scala</sourceInclude>
+						<sourceInclude>**/*.java</sourceInclude>
+					</sourceIncludes>
+				</configuration>
+			</plugin>
+
+			<!-- Adding scala source directories to build path -->
+			<plugin>
+				<groupId>org.codehaus.mojo</groupId>
+				<artifactId>build-helper-maven-plugin</artifactId>
+				<version>1.7</version>
+				<executions>
+					<!-- Add src/main/scala to eclipse build path -->
+					<execution>
+						<id>add-source</id>
+						<phase>generate-sources</phase>
+						<goals>
+							<goal>add-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/main/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+					<!-- Add src/test/scala to eclipse build path -->
+					<execution>
+						<id>add-test-source</id>
+						<phase>generate-test-sources</phase>
+						<goals>
+							<goal>add-test-source</goal>
+						</goals>
+						<configuration>
+							<sources>
+								<source>src/test/scala</source>
+							</sources>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+
+			<plugin>
+				<groupId>org.scalastyle</groupId>
+				<artifactId>scalastyle-maven-plugin</artifactId>
+				<version>0.5.0</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<verbose>false</verbose>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<failOnWarning>false</failOnWarning>
+					<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+					<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+					<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+					<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+					<outputEncoding>UTF-8</outputEncoding>
+				</configuration>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>org.apache.maven.plugins</groupId>
+										<artifactId>maven-dependency-plugin</artifactId>
+										<versionRange>[2.9,)</versionRange>
+										<goals>
+											<goal>unpack</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore/>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
new file mode 100644
index 0000000..2cf66b9
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Example illustrating iterations in Flink streaming. <p/> <p> The program sums up random numbers and counts additions
+ * it performs to reach a specific threshold in an iterative streaming fashion. </p>
+ * <p/>
+ * <p/>
+ * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
+ * outputs. </ul>
+ */
+public class IterateExample {
+
+	private static final int BOUND = 100;
+
+	// *************************************************************************
+	// PROGRAM
+	// *************************************************************************
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		// set up input for the stream of integer pairs
+
+		// obtain execution environment and set setBufferTimeout to 1 to enable
+		// continuous flushing of the output buffers (lowest latency)
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+				.setBufferTimeout(1);
+
+		// create input stream of integer pairs
+		DataStream<Tuple2<Integer, Integer>> inputStream;
+		if (fileInput) {
+			inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
+		} else {
+			inputStream = env.addSource(new RandomFibonacciSource());
+		}
+
+		// create an iterative data stream from the input with 5 second timeout
+		IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
+				.iterate(5000);
+
+		// apply the step function to get the next Fibonacci number
+		// increment the counter and split the output with the output selector
+		SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
+				.split(new MySelector());
+
+		// close the iteration by selecting the tuples that were directed to the
+		// 'iterate' channel in the output selector
+		it.closeWith(step.select("iterate"));
+
+		// to produce the final output select the tuples directed to the
+		// 'output' channel then get the input pairs that have the greatest iteration counter
+		// on a 1 second sliding window
+		DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
+				.map(new OutputMap());
+
+		// emit results
+		if (fileOutput) {
+			numbers.writeAsText(outputPath, 1);
+		} else {
+			numbers.print();
+		}
+
+		// execute the program
+		env.execute("Streaming Iteration Example");
+	}
+
+	// *************************************************************************
+	// USER FUNCTIONS
+	// *************************************************************************
+
+	/**
+	 * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
+	 */
+	private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		private Random rnd = new Random();
+
+		private volatile boolean isRunning = true;
+		private int counter = 0;
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+
+			while (isRunning && counter < BOUND) {
+				int first = rnd.nextInt(BOUND / 2 - 1) + 1;
+				int second = rnd.nextInt(BOUND / 2 - 1) + 1;
+
+				ctx.collect(new Tuple2<Integer, Integer>(first, second));
+				counter++;
+				Thread.sleep(50L);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	/**
+	 * Generate random integer pairs from the range from 0 to BOUND/2
+	 */
+	private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Integer, Integer> map(String value) throws Exception {
+			String record = value.substring(1, value.length() - 1);
+			String[] splitted = record.split(",");
+			return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
+		}
+	}
+
+	/**
+	 * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A
+	 * counter is attached to the tuple and incremented in every iteration step
+	 */
+	public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
+			Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
+				Exception {
+			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f0, value.f1, 0);
+		}
+	}
+
+	/**
+	 * Iteration step function that calculates the next Fibonacci number
+	 */
+	public static class Step implements
+			MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
+					Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
+				Integer> value) throws Exception {
+			return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 +
+					value.f3, ++value.f4);
+		}
+	}
+
+	/**
+	 * OutputSelector testing which tuple needs to be iterated again.
+	 */
+	public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
+			List<String> output = new ArrayList<String>();
+			if (value.f2 < BOUND && value.f3 < BOUND) {
+				output.add("iterate");
+			} else {
+				output.add("output");
+			}
+			return output;
+		}
+	}
+
+	/**
+	 * Giving back the input pair and the counter
+	 */
+	public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
+			Tuple2<Tuple2<Integer, Integer>, Integer>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
+				value) throws
+				Exception {
+			return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1),
+					value.f4);
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileInput = false;
+	private static boolean fileOutput = false;
+	private static String inputPath;
+	private static String outputPath;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			// parse input arguments
+			if (args.length == 1) {
+				fileOutput = true;
+				outputPath = args[0];
+			} else if (args.length == 2) {
+				fileInput = true;
+				inputPath = args[0];
+				fileOutput = true;
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: IterateExample <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing IterateExample with generated data.");
+			System.out.println("  Provide parameter to write to file.");
+			System.out.println("  Usage: IterateExample <result path>");
+		}
+		return true;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
new file mode 100644
index 0000000..0077459
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration.util;
+
+public class IterateExampleData {
+	public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
+			"(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
+			"(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)";
+
+	public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" +
+			"((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
+			"((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" +
+			"((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)";
+
+	private IterateExampleData() {
+	}
+}