You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:23 UTC
[07/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/pom.xml b/flink-streaming-connectors/flink-connector-twitter/pom.xml
new file mode 100644
index 0000000..df963a3
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/pom.xml
@@ -0,0 +1,97 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-twitter</artifactId>
+ <name>flink-connector-twitter</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>hbc-core</artifactId>
+ <version>2.2.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.json</artifactId>
+ <version>2.0.6</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+ <!-- We include all dependencies that transitively depend on guava -->
+ <include>com.twitter:hbc-core</include>
+ <include>com.twitter:joauth</include>
+ </includes>
+ </artifactSet>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
new file mode 100644
index 0000000..0f16541
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
+ *
+ * @param <IN>
+ * Type of the input elements.
+ * @param <OUT>
+ * Type of the returned elements.
+ */
+public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ // private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
+
+ /**
+ * Get the value object associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public Object get(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).get("retValue");
+ }
+
+ /**
+ * Get the boolean value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public boolean getBoolean(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getBoolean("retValue");
+ }
+
+ /**
+ * Get the double value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public double getDouble(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getDouble("retValue");
+ }
+
+ /**
+ * Get the int value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public int getInt(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getInt("retValue");
+ }
+
+ /**
+ * Get the long value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public long getLong(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getLong("retValue");
+ }
+
+ /**
+ * Get the String value associated with a key form a JSON code. It can find
+ * embedded fields, too.
+ *
+ * @param jsonText
+ * JSON String in which the field is searched.
+ * @param field
+ * The key whose value is searched for.
+ * @return The object associated with the field.
+ * @throws JSONException
+ * If the field is not found.
+ */
+ public String getString(String jsonText, String field) throws JSONException {
+ JSONParser parser = new JSONParser(jsonText);
+
+ return parser.parse(field).getString("retValue");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
new file mode 100644
index 0000000..c1eabbd
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.StringTokenizer;
+
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+
+/**
+ * A JSONParser contains a JSONObject and provides opportunity to access
+ * embedded fields in JSON code.
+ */
+public class JSONParser {
+
+ private JSONObject originalJO;
+ private String searchedfield;
+ private Object temp;
+
+ /**
+ * Construct a JSONParser from a string. The string has to be a JSON code
+ * from which we want to get a field.
+ *
+ * @param jsonText
+ * A string which contains a JSON code. String representation of
+ * a JSON code.
+ * @throws JSONException
+ * If there is a syntax error in the source string.
+ */
+ public JSONParser(String jsonText) throws JSONException {
+ originalJO = new JSONObject(jsonText);
+ }
+
+ /**
+ *
+ * Parse the JSON code passed to the constructor to find the given key.
+ *
+ * @param key
+ * The key whose value is searched for.
+ * @return A JSONObject which has only one field called "retValue" and the
+ * value associated to it is the searched value. The methods of
+ * JSONObject can be used to get the field value in a desired
+ * format.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ public JSONObject parse(String key) throws JSONException {
+ initializeParser(key);
+ parsing();
+ return putResultInJSONObj();
+ }
+
+ /**
+ * Prepare the fields of the class for the parsing
+ *
+ * @param key
+ * The key whose value is searched for.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void initializeParser(String key) throws JSONException {
+ searchedfield = key;
+ temp = new JSONObject(originalJO.toString());
+ }
+
+ /**
+ * This function goes through the given field and calls the appropriate
+ * functions to treat the units between the punctuation marks.
+ *
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void parsing() throws JSONException {
+ StringTokenizer st = new StringTokenizer(searchedfield, ".");
+ while (st.hasMoreTokens()) {
+ find(st.nextToken());
+ }
+ }
+
+ /**
+ * Search for the next part of the field and update the state if it was
+ * found.
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @throws JSONException
+ * If the key is not found.
+ */
+ private void find(String nextToken) throws JSONException {
+ if (endsWithBracket(nextToken)) {
+ treatAllBracket(nextToken);
+ } else {
+ temp = ((JSONObject) temp).get(nextToken);
+ }
+ }
+
+ /**
+ * Determine whether the given string ends with a closing square bracket ']'
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @return True if the given string ends with a closing square bracket ']'
+ * and false otherwise.
+ */
+ private boolean endsWithBracket(String nextToken) {
+ return nextToken.substring(nextToken.length() - 1).endsWith("]");
+ }
+
+ /**
+ * Handle (multidimensional) arrays. Treat the square bracket pairs one
+ * after the other if necessary.
+ *
+ * @param nextToken
+ * The current part of the searched field.
+ * @throws JSONException
+ * If the searched element is not found.
+ */
+ private void treatAllBracket(String nextToken) throws JSONException {
+ List<String> list = Arrays.asList(nextToken.split("\\["));
+ ListIterator<String> iter = list.listIterator();
+
+ temp = ((JSONObject) temp).get(iter.next());
+
+ while (iter.hasNext()) {
+ int index = Integer.parseInt(cutBracket(iter.next()));
+ temp = ((JSONArray) temp).get(index);
+ }
+ }
+
+ /**
+ * Remove the last character of the string.
+ *
+ * @param string
+ * String to modify.
+ * @return The given string without the last character.
+ */
+ private String cutBracket(String string) {
+ return string.substring(0, string.length() - 1);
+ }
+
+ /**
+ * Save the result of the search into a JSONObject.
+ *
+ * @return A special JSONObject which contain only one key. The value
+ * associated to this key is the result of the search.
+ * @throws JSONException
+ * If there is a problem creating the JSONObject. (e.g. invalid
+ * syntax)
+ */
+ private JSONObject putResultInJSONObj() throws JSONException {
+ JSONObject jo = new JSONObject();
+ jo.put("retValue", temp);
+ return jo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
new file mode 100644
index 0000000..8dd4458
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.core.endpoint.Location;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
+import com.twitter.hbc.httpclient.auth.Authentication;
+
+/**
+ *
+ * An extension of {@link TwitterSource} by filter parameters. This extension
+ * enables to filter the twitter stream by user defined parameters.
+ */
+public class TwitterFilterSource extends TwitterSource {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TwitterFilterSource.class);
+
+ private static final long serialVersionUID = 1L;
+
+ private List<String> trackTerms = new LinkedList<String>();
+
+ private List<String> languages = new LinkedList<String>();
+
+ private List<Long> followings = new LinkedList<Long>();
+
+ private List<Location> locations = new LinkedList<Location>();
+
+ private Map<String, String> queryParameters = new HashMap<String, String>();
+
+ private Map<String, String> postParameters = new HashMap<String, String>();
+
+ public TwitterFilterSource(String authPath) {
+ super(authPath);
+ }
+
+ @Override
+ protected void initializeConnection() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initializing Twitter Streaming API connection");
+ }
+ queue = new LinkedBlockingQueue<String>(queueSize);
+
+ StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
+ configEndpoint(endpoint);
+ endpoint.stallWarnings(false);
+
+ Authentication auth = authenticate();
+
+ initializeClient(endpoint, auth);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Twitter Streaming API connection established successfully");
+ }
+ }
+
+ /**
+ * This function configures the streaming endpoint
+ *
+ * @param endpoint
+ * The streaming endpoint
+ */
+ private void configEndpoint(StatusesFilterEndpoint endpoint) {
+ if (!trackTerms.isEmpty()) {
+ endpoint.trackTerms(trackTerms);
+ }
+ if (!languages.isEmpty()) {
+ endpoint.languages(languages);
+ }
+ if (!followings.isEmpty()) {
+ endpoint.followings(followings);
+ }
+ if (!locations.isEmpty()) {
+ endpoint.locations(locations);
+ }
+ if (!queryParameters.isEmpty()) {
+ for (Entry<String, String> entry : queryParameters.entrySet()) {
+ endpoint.addQueryParameter(entry.getKey(), entry.getValue());
+ }
+ }
+ if (!postParameters.isEmpty()) {
+ for (Entry<String, String> entry : postParameters.entrySet()) {
+ endpoint.addPostParameter(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ /**
+ * This function sets which term to track.
+ *
+ * @param term
+ * The term to track.
+ */
+ public void trackTerm(String term) {
+ this.trackTerms.add(term);
+ }
+
+ /**
+ * This function sets which terms to track.
+ *
+ * @param terms
+ * The terms to track.
+ */
+ public void trackTerms(Collection<String> terms) {
+ this.trackTerms.addAll(terms);
+ }
+
+ /**
+ * This function tells which terms are tracked.
+ */
+ public List<String> getTrackTerms() {
+ return this.trackTerms;
+ }
+
+ /**
+ * This function sets which language to filter.
+ *
+ * @param language
+ * The language to filter.
+ */
+ public void filterLanguage(String language) {
+ this.languages.add(language);
+ }
+
+ /**
+ * This function sets which languages to filter.
+ *
+ * @param languages
+ * The languages to filter.
+ */
+ public void filterLanguages(Collection<String> languages) {
+ this.languages.addAll(languages);
+ }
+
+ /**
+ * This function tells which languages are filtered.
+ */
+ public List<String> getLanguages() {
+ return this.languages;
+ }
+
+ /**
+ * This function sets which user to follow.
+ *
+ * @param userID
+ * The ID of the user to follow.
+ */
+ public void filterFollowings(Long userID) {
+ this.followings.add(userID);
+ }
+
+ /**
+ * This function sets which users to follow.
+ *
+ * @param userIDs
+ * The IDs of the users to follow.
+ */
+ public void filterFollowings(Collection<Long> userIDs) {
+ this.followings.addAll(userIDs);
+ }
+
+ /**
+ * This function tells which users are followed.
+ */
+ public List<Long> getFollowings() {
+ return this.followings;
+ }
+
+ /**
+ * This function sets which location to filter.
+ *
+ * @param location
+ * The location to filter.
+ */
+ public void filterLocation(Location location) {
+ this.locations.add(location);
+ }
+
+ /**
+ * This function sets which locations to filter.
+ *
+ * @param locations
+ * The locations to filter.
+ */
+ public void filterLocations(Collection<Location> locations) {
+ this.locations.addAll(locations);
+ }
+
+ /**
+ * This function tells which locations are filtered.
+ */
+ public List<Location> getLocations() {
+ return this.locations;
+ }
+
+ /**
+ * This function sets a query parameter.
+ *
+ * @param parameter
+ * The name of the query parameter.
+ * @param value
+ * The value of the query parameter.
+ */
+ public void addQueryParameter(String parameter, String value) {
+ this.queryParameters.put(parameter, value);
+ }
+
+ /**
+ * This function sets query parameters.
+ *
+ * @param queryParameters
+ * The query parameters for the endpoint.
+ */
+ public void addQueryParameters(Map<String, String> queryParameters) {
+ this.queryParameters.putAll(queryParameters);
+ }
+
+ /**
+ * This function tells which query parameters are used by the endpoint.
+ */
+ public Map<String, String> getQueryParameters() {
+ return this.queryParameters;
+ }
+
+ /**
+ * This function sets a post parameter.
+ *
+ * @param parameter
+ * The name of the post parameter.
+ * @param value
+ * The value of the post parameter.
+ */
+ public void addPostParameter(String parameter, String value) {
+ this.postParameters.put(parameter, value);
+ }
+
+ /**
+ * This function sets post parameters.
+ *
+ * @param postParameters
+ * The post parameters for the endpoint.
+ */
+ public void addPostParameters(Map<String, String> postParameters) {
+ this.postParameters.putAll(postParameters);
+ }
+
+ /**
+ * This function tells which post parameters are used by the endpoint.
+ */
+ public Map<String, String> postParameters() {
+ return this.postParameters;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
new file mode 100644
index 0000000..43cb179
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an example how to use TwitterFilterSource. Before executing the
+ * example you have to define the access keys of twitter.properties in the
+ * resource folder. The access keys can be found in your twitter account.
+ */
+public class TwitterFilterSourceExample {
+
+ /**
+ * path to the twitter properties
+ */
+ private static final String PATH_TO_AUTH_FILE = "/twitter.properties";
+
+ public static void main(String[] args) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .getExecutionEnvironment();
+
+ TwitterFilterSource twitterSource = new TwitterFilterSource(
+ TwitterFilterSourceExample.class.getResource(PATH_TO_AUTH_FILE)
+ .getFile());
+
+ twitterSource.trackTerm("obama");
+ twitterSource.filterLanguage("en");
+
+ DataStream<String> streamSource = env.addSource(twitterSource).flatMap(
+ new JSONParseFlatMap<String, String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String s, Collector<String> c)
+ throws Exception {
+ c.collect(s);
+ }
+ });
+
+ streamSource.print();
+
+ try {
+ env.execute("Twitter Streaming Test");
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
new file mode 100644
index 0000000..bad0f8c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.processor.StringDelimitedProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Implementation of {@link SourceFunction} specialized to emit tweets from
+ * Twitter. This is not a parallel source because the Twitter API only allows
+ * two concurrent connections.
+ */
+public class TwitterSource extends RichSourceFunction<String> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
+
+ private static final long serialVersionUID = 1L;
+ private String authPath;
+ protected transient BlockingQueue<String> queue;
+ protected int queueSize = 10000;
+ private transient BasicClient client;
+ private int waitSec = 5;
+
+ private int maxNumberOfTweets;
+ private int currentNumberOfTweets;
+
+ private transient volatile boolean isRunning;
+
+ /**
+ * Create {@link TwitterSource} for streaming
+ *
+ * @param authPath
+ * Location of the properties file containing the required
+ * authentication information.
+ */
+ public TwitterSource(String authPath) {
+ this.authPath = authPath;
+ maxNumberOfTweets = -1;
+ }
+
+ /**
+ * Create {@link TwitterSource} to collect finite number of tweets
+ *
+ * @param authPath
+ * Location of the properties file containing the required
+ * authentication information.
+ * @param numberOfTweets
+ *
+ */
+ public TwitterSource(String authPath, int numberOfTweets) {
+ this.authPath = authPath;
+ this.maxNumberOfTweets = numberOfTweets;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ initializeConnection();
+ currentNumberOfTweets = 0;
+ isRunning = true;
+ }
+
+ /**
+ * Initialize Hosebird Client to be able to consume Twitter's Streaming API
+ */
+ protected void initializeConnection() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initializing Twitter Streaming API connection");
+ }
+
+ queue = new LinkedBlockingQueue<String>(queueSize);
+
+ StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+ endpoint.stallWarnings(false);
+
+ Authentication auth = authenticate();
+
+ initializeClient(endpoint, auth);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Twitter Streaming API connection established successfully");
+ }
+ }
+
+ protected OAuth1 authenticate() {
+
+ Properties authenticationProperties = loadAuthenticationProperties();
+
+ return new OAuth1(authenticationProperties.getProperty("consumerKey"),
+ authenticationProperties.getProperty("consumerSecret"),
+ authenticationProperties.getProperty("token"),
+ authenticationProperties.getProperty("secret"));
+ }
+
+ /**
+ * Reads the given properties file for the authentication data.
+ *
+ * @return the authentication data.
+ */
+ private Properties loadAuthenticationProperties() {
+
+ Properties properties = new Properties();
+ try {
+ InputStream input = new FileInputStream(authPath);
+ properties.load(input);
+ input.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot open .properties file: " + authPath, e);
+ }
+ return properties;
+ }
+
+ protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) {
+
+ client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
+ .endpoint(endpoint).authentication(auth)
+ .processor(new StringDelimitedProcessor(queue)).build();
+
+ client.connect();
+ }
+
+ @Override
+ public void close() {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Initiating connection close");
+ }
+
+ if (client != null) {
+ client.stop();
+ }
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Connection closed successfully");
+ }
+ }
+
+ /**
+ * Get the size of the queue in which the tweets are contained temporarily.
+ *
+ * @return the size of the queue in which the tweets are contained
+ * temporarily
+ */
+ public int getQueueSize() {
+ return queueSize;
+ }
+
+ /**
+ * Set the size of the queue in which the tweets are contained temporarily.
+ *
+ * @param queueSize
+ * The desired value.
+ */
+ public void setQueueSize(int queueSize) {
+ this.queueSize = queueSize;
+ }
+
+ /**
+ * This function tells how long TwitterSource waits for the tweets.
+ *
+ * @return Number of second.
+ */
+ public int getWaitSec() {
+ return waitSec;
+ }
+
+ /**
+ * This function sets how long TwitterSource should wait for the tweets.
+ *
+ * @param waitSec
+ * The desired value.
+ */
+ public void setWaitSec(int waitSec) {
+ this.waitSec = waitSec;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ while (isRunning) {
+ if (client.isDone()) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
+ .getMessage());
+ }
+ break;
+ }
+
+ ctx.collect(queue.take());
+
+ if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
new file mode 100644
index 0000000..a80c32a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
@@ -0,0 +1,99 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TwitterStreaming {
+
+ private static final int PARALLELISM = 1;
+ private static final int SOURCE_PARALLELISM = 1;
+ private static final int NUMBEROFTWEETS = 100;
+
+ private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
+
+ public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void invoke(Tuple5<Long, Integer, String, String, String> tuple) {
+ System.out.println("ID: " + tuple.f0 + " int: " + tuple.f1 + " LANGUAGE: " + tuple.f2);
+ System.out.println("NAME: " + tuple.f4);
+ System.out.println("TEXT: " + tuple.f3);
+ System.out.println("");
+ }
+
+ }
+
+ public static class SelectDataFlatMap extends
+ JSONParseFlatMap<String, Tuple5<Long, Integer, String, String, String>> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(String value, Collector<Tuple5<Long, Integer, String, String, String>> out)
+ throws Exception {
+ try {
+ out.collect(new Tuple5<Long, Integer, String, String, String>(
+ getLong(value, "id"),
+ getInt(value, "entities.hashtags[0].indices[1]"),
+ getString(value, "lang"),
+ getString(value, "text"),
+ getString(value, "user.name")));
+ } catch (JSONException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Field not found");
+ }
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ String path = new String();
+
+ if (args != null && args.length == 1) {
+ path = args[0];
+ } else {
+ System.err.println("USAGE:\nTwitterStreaming <pathToPropertiesFile>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment
+ .createLocalEnvironment(PARALLELISM);
+
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
+ .setParallelism(SOURCE_PARALLELISM);
+
+ DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
+ .flatMap(new SelectDataFlatMap());
+
+ selectedDataStream.addSink(new TwitterSink());
+
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
new file mode 100644
index 0000000..b1fc92c
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -0,0 +1,92 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+* This program demonstrate the use of TwitterSource.
+* Its aim is to count the frequency of the languages of tweets
+*/
+public class TwitterTopology {
+
+ private static final int NUMBEROFTWEETS = 100;
+
+ /**
+ * FlatMapFunction to determine the language of tweets if possible
+ */
+ public static class SelectLanguageFlatMap extends
+ JSONParseFlatMap<String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Select the language from the incoming JSON text
+ */
+ @Override
+ public void flatMap(String value, Collector<String> out) throws Exception {
+ try{
+ out.collect(getString(value, "lang"));
+ }
+ catch (JSONException e){
+ out.collect("");
+ }
+ }
+
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ String path = new String();
+
+ if (args != null && args.length == 1) {
+ path = args[0];
+ } else {
+ System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
+
+
+ DataStream<Tuple2<String, Integer>> dataStream = streamSource
+ .flatMap(new SelectLanguageFlatMap())
+ .map(new MapFunction<String, Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> map(String value) throws Exception {
+ return new Tuple2<String, Integer>(value, 1);
+ }
+ })
+ .keyBy(0)
+ .sum(1);
+
+ dataStream.print();
+
+ env.execute();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties b/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
new file mode 100644
index 0000000..1ca4143
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+secret=***
+consumerSecret=***
+token=***-***
+consumerKey=***
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
new file mode 100644
index 0000000..b1d4115
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class JSONParserTest {
+
+ private String jsonText;
+ private String searchedField;
+
+ public JSONParserTest(String text, String field) {
+ jsonText = text;
+ searchedField = field;
+ }
+
+ @Parameters
+ public static Collection<Object[]> initParameterList() {
+
+ Object[][] parameterList = new Object[][] {
+ { "{\"key\":\"value\"}", "key" },
+ { "{\"key\":[\"value\"]}", "key[0]" },
+ { "{\"key\":[{\"key\":\"value\"}]}", "key[0].key" },
+ { "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", "key[0].key[0].key"},
+ { "{\"key\":[1,[{\"key\":\"value\"}]]}", "key[1][0].key" },
+ { "{\"key\":[1,[[\"key\",2,\"value\"]]]}", "key[1][0][2]" },
+ { "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
+ };
+
+ return Arrays.asList(parameterList);
+ }
+
+ @Test
+ public void test() {
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+ String expected = "{\"retValue\":\"value\"}";
+
+ assertTrue(expected.equals(jo.toString()));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
new file mode 100644
index 0000000..8851086
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+
+
+public class JSONParserTest2 {
+
+ @Test
+ public void testGetBooleanFunction() {
+ String jsonText = "{\"key\":true}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertTrue(jo.getBoolean("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetDoubleFunction() {
+ double expected = 12345.12345;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getDouble("retValue"),0.000001);
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetIntFunction() {
+ int expected = 15;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getInt("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testGetLongFunction() {
+ long expected = 111111111111L;
+ String jsonText = "{\"key\":" + expected + "}";
+ String searchedField = "key";
+ try {
+ JSONParser parser = new JSONParser(jsonText);
+ JSONObject jo = parser.parse(searchedField);
+
+ assertEquals(expected,jo.getLong("retValue"));
+ }
+ catch (JSONException e) {
+ fail();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9ede613
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<configuration>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>
+ <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/pom.xml b/flink-streaming-connectors/pom.xml
new file mode 100644
index 0000000..dc395b2
--- /dev/null
+++ b/flink-streaming-connectors/pom.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <name>flink-streaming-connectors</name>
+
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>flink-connector-flume</module>
+ <module>flink-connector-kafka</module>
+ <module>flink-connector-elasticsearch</module>
+ <module>flink-connector-rabbitmq</module>
+ <module>flink-connector-twitter</module>
+ <module>flink-connector-nifi</module>
+ </modules>
+
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ <modules>
+ <!-- Include the flink-fs-tests project only for HD2.
+ The HDFS minicluster interfaces changed between the two versions.
+ -->
+ <module>flink-connector-filesystem</module>
+ </modules>
+ </profile>
+ </profiles>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/pom.xml b/flink-streaming-examples/pom.xml
new file mode 100644
index 0000000..5e06411
--- /dev/null
+++ b/flink-streaming-examples/pom.xml
@@ -0,0 +1,535 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-parent</artifactId>
+ <version>0.10-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-streaming-examples</artifactId>
+ <name>flink-streaming-examples</name>
+
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-scala</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-twitter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- get default data from flink-java-examples package -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <version>2.9</version><!--$NO-MVN-MAN-VER$-->
+ <executions>
+ <execution>
+ <id>unpack</id>
+ <phase>prepare-package</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <!-- For WordCount example data -->
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java-examples</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ <includes>org/apache/flink/examples/java/wordcount/util/WordCountData.class</includes>
+ </artifactItem>
+ <!-- For JSON utilities -->
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-twitter</artifactId>
+ <version>${project.version}</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/classes</outputDirectory>
+ <includes>org/apache/flink/streaming/connectors/json/*</includes>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- self-contained jars for each example -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version><!--$NO-MVN-MAN-VER$-->
+ <executions>
+ <!-- Default Execution -->
+ <execution>
+ <id>default</id>
+ <phase>package</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+
+ <!-- Iteration -->
+ <execution>
+ <id>Iteration</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>Iteration</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.iteration.IterateExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/iteration/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- IncrementalLearning -->
+ <execution>
+ <id>IncrementalLearning</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>IncrementalLearning</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.ml.IncrementalLearningSkeleton</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/ml/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- Twitter -->
+ <execution>
+ <id>Twitter</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>Twitter</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.twitter.TwitterStream</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/twitter/*.class</include>
+ <include>org/apache/flink/streaming/examples/twitter/util/*.class</include>
+ <include>org/apache/flink/streaming/connectors/json/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WindowJoin -->
+ <execution>
+ <id>WindowJoin</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WindowJoin</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.join.WindowJoin</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/join/*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WordCountPOJO -->
+ <execution>
+ <id>WordCountPOJO</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WordCountPOJO</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.wordcount.PojoExample</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/wordcount/PojoExample.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/PojoExample$*.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WordCount -->
+ <execution>
+ <id>WordCount</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WordCount</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.wordcount.WordCount</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- WindowWordCount -->
+ <execution>
+ <id>WindowWordCount</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>WindowWordCount</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.windowing.WindowWordCount</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/windowing/WindowWordCount.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+ <include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- SocketTextStreamWordCount -->
+ <execution>
+ <id>SocketTextStreamWordCount</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>SocketTextStreamWordCount</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.socket.SocketTextStreamWordCount</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/socket/SocketTextStreamWordCount.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount.class</include>
+ <include>org/apache/flink/streaming/examples/wordcount/WordCount$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- TopSpeedWindowing -->
+ <execution>
+ <id>TopSpeedWindowing</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>TopSpeedWindowing</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.windowing.TopSpeedWindowing</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
+ <include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ <!-- SessionWindowing -->
+ <execution>
+ <id>SessionWindowing</id>
+ <phase>package</phase>
+ <goals>
+ <goal>jar</goal>
+ </goals>
+ <configuration>
+ <classifier>SessionWindowing</classifier>
+
+ <archive>
+ <manifestEntries>
+ <program-class>org.apache.flink.streaming.examples.windowing.SessionWindowing</program-class>
+ </manifestEntries>
+ </archive>
+
+ <includes>
+ <include>org/apache/flink/streaming/examples/windowing/SessionWindowing.class</include>
+ <include>org/apache/flink/streaming/examples/windowing/SessionWindowing$*.class</include>
+ </includes>
+ </configuration>
+ </execution>
+
+ </executions>
+ </plugin>
+
+
+ <!-- Scala Compiler -->
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>3.1.4</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+ scala classes can be resolved later in the (Java) test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <jvmArgs>
+ <jvmArg>-Xms128m</jvmArg>
+ <jvmArg>-Xmx512m</jvmArg>
+ </jvmArgs>
+ </configuration>
+ </plugin>
+
+ <!-- Eclipse Integration -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-eclipse-plugin</artifactId>
+ <version>2.8</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <projectnatures>
+ <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+ <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+ </projectnatures>
+ <buildcommands>
+ <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+ </buildcommands>
+ <classpathContainers>
+ <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+ <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+ </classpathContainers>
+ <excludes>
+ <exclude>org.scala-lang:scala-library</exclude>
+ <exclude>org.scala-lang:scala-compiler</exclude>
+ </excludes>
+ <sourceIncludes>
+ <sourceInclude>**/*.scala</sourceInclude>
+ <sourceInclude>**/*.java</sourceInclude>
+ </sourceIncludes>
+ </configuration>
+ </plugin>
+
+ <!-- Adding scala source directories to build path -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <!-- Add src/main/scala to eclipse build path -->
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/main/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ <!-- Add src/test/scala to eclipse build path -->
+ <execution>
+ <id>add-test-source</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/test/scala</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ <version>0.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <verbose>false</verbose>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <failOnWarning>false</failOnWarning>
+ <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+ <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+ <configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
+ <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+ <outputEncoding>UTF-8</outputEncoding>
+ </configuration>
+ </plugin>
+ </plugins>
+
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <versionRange>[2.9,)</versionRange>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
new file mode 100644
index 0000000..2cf66b9
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeStream;
+import org.apache.flink.streaming.api.datastream.SplitStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Example illustrating iterations in Flink streaming. <p/> <p> The program sums up random numbers and counts additions
+ * it performs to reach a specific threshold in an iterative streaming fashion. </p>
+ * <p/>
+ * <p/>
+ * This example shows how to use: <ul> <li>streaming iterations, <li>buffer timeout to enhance latency, <li>directed
+ * outputs. </ul>
+ */
+public class IterateExample {
+
+ private static final int BOUND = 100;
+
+ // *************************************************************************
+ // PROGRAM
+ // *************************************************************************
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ // set up input for the stream of integer pairs
+
+ // obtain execution environment and set setBufferTimeout to 1 to enable
+ // continuous flushing of the output buffers (lowest latency)
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+ .setBufferTimeout(1);
+
+ // create input stream of integer pairs
+ DataStream<Tuple2<Integer, Integer>> inputStream;
+ if (fileInput) {
+ inputStream = env.readTextFile(inputPath).map(new FibonacciInputMap());
+ } else {
+ inputStream = env.addSource(new RandomFibonacciSource());
+ }
+
+ // create an iterative data stream from the input with 5 second timeout
+ IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
+ .iterate(5000);
+
+ // apply the step function to get the next Fibonacci number
+ // increment the counter and split the output with the output selector
+ SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> step = it.map(new Step())
+ .split(new MySelector());
+
+ // close the iteration by selecting the tuples that were directed to the
+ // 'iterate' channel in the output selector
+ it.closeWith(step.select("iterate"));
+
+ // to produce the final output select the tuples directed to the
+ // 'output' channel then get the input pairs that have the greatest iteration counter
+ // on a 1 second sliding window
+ DataStream<Tuple2<Tuple2<Integer, Integer>, Integer>> numbers = step.select("output")
+ .map(new OutputMap());
+
+ // emit results
+ if (fileOutput) {
+ numbers.writeAsText(outputPath, 1);
+ } else {
+ numbers.print();
+ }
+
+ // execute the program
+ env.execute("Streaming Iteration Example");
+ }
+
+ // *************************************************************************
+ // USER FUNCTIONS
+ // *************************************************************************
+
+ /**
+ * Generate BOUND number of random integer pairs from the range from 0 to BOUND/2
+ */
+ private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ private Random rnd = new Random();
+
+ private volatile boolean isRunning = true;
+ private int counter = 0;
+
+ @Override
+ public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+
+ while (isRunning && counter < BOUND) {
+ int first = rnd.nextInt(BOUND / 2 - 1) + 1;
+ int second = rnd.nextInt(BOUND / 2 - 1) + 1;
+
+ ctx.collect(new Tuple2<Integer, Integer>(first, second));
+ counter++;
+ Thread.sleep(50L);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ isRunning = false;
+ }
+ }
+
+ /**
+ * Generate random integer pairs from the range from 0 to BOUND/2
+ */
+ private static class FibonacciInputMap implements MapFunction<String, Tuple2<Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Integer, Integer> map(String value) throws Exception {
+ String record = value.substring(1, value.length() - 1);
+ String[] splitted = record.split(",");
+ return new Tuple2<Integer, Integer>(Integer.parseInt(splitted[0]), Integer.parseInt(splitted[1]));
+ }
+ }
+
+ /**
+ * Map the inputs so that the next Fibonacci numbers can be calculated while preserving the original input tuple A
+ * counter is attached to the tuple and incremented in every iteration step
+ */
+ public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer,
+ Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws
+ Exception {
+ return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f0, value.f1, 0);
+ }
+ }
+
+ /**
+ * Iteration step function that calculates the next Fibonacci number
+ */
+ public static class Step implements
+ MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>, Tuple5<Integer, Integer, Integer,
+ Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple5<Integer, Integer, Integer, Integer,
+ Integer> value) throws Exception {
+ return new Tuple5<Integer, Integer, Integer, Integer, Integer>(value.f0, value.f1, value.f3, value.f2 +
+ value.f3, ++value.f4);
+ }
+ }
+
+ /**
+ * OutputSelector testing which tuple needs to be iterated again.
+ */
+ public static class MySelector implements OutputSelector<Tuple5<Integer, Integer, Integer, Integer, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Iterable<String> select(Tuple5<Integer, Integer, Integer, Integer, Integer> value) {
+ List<String> output = new ArrayList<String>();
+ if (value.f2 < BOUND && value.f3 < BOUND) {
+ output.add("iterate");
+ } else {
+ output.add("output");
+ }
+ return output;
+ }
+ }
+
+ /**
+ * Giving back the input pair and the counter
+ */
+ public static class OutputMap implements MapFunction<Tuple5<Integer, Integer, Integer, Integer, Integer>,
+ Tuple2<Tuple2<Integer, Integer>, Integer>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<Tuple2<Integer, Integer>, Integer> map(Tuple5<Integer, Integer, Integer, Integer, Integer>
+ value) throws
+ Exception {
+ return new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(value.f0, value.f1),
+ value.f4);
+ }
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileInput = false;
+ private static boolean fileOutput = false;
+ private static String inputPath;
+ private static String outputPath;
+
+ private static boolean parseParameters(String[] args) {
+
+ if (args.length > 0) {
+ // parse input arguments
+ if (args.length == 1) {
+ fileOutput = true;
+ outputPath = args[0];
+ } else if (args.length == 2) {
+ fileInput = true;
+ inputPath = args[0];
+ fileOutput = true;
+ outputPath = args[1];
+ } else {
+ System.err.println("Usage: IterateExample <result path>");
+ return false;
+ }
+ } else {
+ System.out.println("Executing IterateExample with generated data.");
+ System.out.println(" Provide parameter to write to file.");
+ System.out.println(" Usage: IterateExample <result path>");
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
----------------------------------------------------------------------
diff --git a/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
new file mode 100644
index 0000000..0077459
--- /dev/null
+++ b/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iteration/util/IterateExampleData.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.iteration.util;
+
+public class IterateExampleData {
+ public static final String INPUT_PAIRS = "(1,40)\n" + "(29,38)\n" + "(11,15)\n" + "(17,39)\n" + "(24,41)\n" +
+ "(7,33)\n" + "(20,2)\n" + "(11,5)\n" + "(3,16)\n" + "(23,36)\n" + "(15,23)\n" + "(28,13)\n" + "(1,1)\n" +
+ "(10,6)\n" + "(21,5)\n" + "(14,36)\n" + "(17,15)\n" + "(7,9)";
+
+ public static final String RESULTS = "((1,40),3)\n" + "((24,41),2)\n" + "((3,16),5)\n" + "((1,1),10)\n" +
+ "((17,15),4)\n" + "((29,38),2)\n" + "((7,33),3)\n" + "((23,36),3)\n" + "((10,6),6)\n" + "((7,9),5)\n" +
+ "((11,15),4)\n" + "((20,2),5)\n" + "((15,23),4)\n" + "((21,5),5)\n" +
+ "((17,39),3)\n" + "((11,5),6)\n" + "((28,13),4)\n" + "((14,36),3)";
+
+ private IterateExampleData() {
+ }
+}