You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:04:01 UTC

[45/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
deleted file mode 100644
index d373d63..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements. See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License. You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-<template><description></description><name>NiFi_Flink</name><snippet><connections><id>34acfdda-dd21-48c0-8779-95d0e258f5cb</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>769242e5-ee04-4656-a684-ca661a18eed6</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>59574e3b-1ba7-4343-b265-af1b67923a85</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThresh
 old>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>48042218-a51e-45c7-bd30-2290bba8b191</id><type>OUTPUT_PORT</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><selectedRelationships>success</selectedRelationships><source><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><type>PROCESSOR</type></source><zIndex>0</zIndex></connections><connections><id>46c9343f-f732-4e2d-98e1-13caab5d2f5e</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><backPressureDataSizeThreshold>0 MB</backPressureDataSizeThreshold><backPressureObjectThreshold>0</backPressureObjectThreshold><destination><groupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><type>PROCESSOR</type></destination><flowFileExpiration>0 sec</flowFileExpiration><labelIndex>1</labelIndex><name></name><source><groupI
 d>0f854f2b-239f-45f0-bfed-48b5b23f7928</groupId><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><type>INPUT_PORT</type></source><zIndex>0</zIndex></connections><inputPorts><id>cd8c5227-cfa9-4603-9472-b2234d7bd741</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>395.0</x><y>520.0</y></position><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data from Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>INPUT_PORT</type></inputPorts><outputPorts><id>48042218-a51e-45c7-bd30-2290bba8b191</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1616.0</x><y>259.0</y></position><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><name>Data for Flink</name><state>RUNNING</state><transmitting>false</transmitting><type>OUTPUT_PORT</type></outputPorts><processors><id>769242e5-ee04-4656-a684-ca661a18eed6</id><parentGroupId>0f854f2b-239f-45f0-bfed-48
 b5b23f7928</parentGroupId><position><x>389.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>File Size</key><value><description>The size of the file that will be used</description><displayName>File Size</displayName><dynamic>false</dynamic><name>File Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Batch Size</key><value><defaultValue>1</defaultValue><description>The number of FlowFiles to be transferr
 ed in each invocation</description><displayName>Batch Size</displayName><dynamic>false</dynamic><name>Batch Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Data Format</key><value><allowableValues><displayName>Binary</displayName><value>Binary</value></allowableValues><allowableValues><displayName>Text</displayName><value>Text</value></allowableValues><defaultValue>Binary</defaultValue><description>Specifies whether the data should be Text or Binary</description><displayName>Data Format</displayName><dynamic>false</dynamic><name>Data Format</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Unique FlowFiles</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, ea
 ch FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles will get the same content but this offers much higher throughput</description><displayName>Unique FlowFiles</displayName><dynamic>false</dynamic><name>Unique FlowFiles</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>File Size</key><value>1 b</value></entry><entry><key>Batch Size</key><value>1</value></entry><entry><key>Data Format</key><value>Binary</value></entry><entry><key>Unique FlowFiles</key><value>false</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>2 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>GenerateFlowFile</name><relationships><autoTerminate>false</autoTerminate><description></des
 cription><name>success</name></relationships><state>STOPPED</state><style/><supportsEventDriven>false</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.GenerateFlowFile</type></processors><processors><id>e3a9026f-dae1-42ca-851c-02d9fda22094</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>826.0</x><y>499.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><entry><key>Log Level</key><value><al
 lowableValues><displayName>trace</displayName><value>trace</value></allowableValues><allowableValues><displayName>debug</displayName><value>debug</value></allowableValues><allowableValues><displayName>info</displayName><value>info</value></allowableValues><allowableValues><displayName>warn</displayName><value>warn</value></allowableValues><allowableValues><displayName>error</displayName><value>error</value></allowableValues><defaultValue>info</defaultValue><description>The Log Level to use when logging the Attributes</description><displayName>Log Level</displayName><dynamic>false</dynamic><name>Log Level</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Log Payload</key><value><allowableValues><displayName>true</displayName><value>true</value></allowableValues><allowableValues><displayName>false</displayName><value>false</value></allowableValues><defaultValue>false</defaultValue><description>If true, the FlowFile's p
 ayload will be logged, in addition to its attributes; otherwise, just the Attributes will be logged.</description><displayName>Log Payload</displayName><dynamic>false</dynamic><name>Log Payload</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Log</key><value><description>A comma-separated list of Attributes to Log. If not specified, all attributes will be logged.</description><displayName>Attributes to Log</displayName><dynamic>false</dynamic><name>Attributes to Log</name><required>false</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Attributes to Ignore</key><value><description>A comma-separated list of Attributes to ignore. If not specified, no attributes will be ignored.</description><displayName>Attributes to Ignore</displayName><dynamic>false</dynamic><name>Attributes to Ignore</name><required>false</required><sensitive>false</sensitive><supportsEl>
 false</supportsEl></value></entry><entry><key>Log prefix</key><value><description>Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.</description><displayName>Log prefix</displayName><dynamic>false</dynamic><name>Log prefix</name><required>false</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Log Level</key></entry><entry><key>Log Payload</key><value>true</value></entry><entry><key>Attributes to Log</key></entry><entry><key>Attributes to Ignore</key></entry><entry><key>Log prefix</key></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>LogAttribute</name><relationships><autoTerminate>true</autoTerminate><description>All FlowFil
 es are routed to this relationship</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supportsEventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.LogAttribute</type></processors><processors><id>8b8b6a2f-ee24-4f32-a178-248f3a3f5482</id><parentGroupId>0f854f2b-239f-45f0-bfed-48b5b23f7928</parentGroupId><position><x>1000.0</x><y>231.0</y></position><config><bulletinLevel>WARN</bulletinLevel><comments></comments><concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount><defaultConcurrentTasks><entry><key>TIMER_DRIVEN</key><value>1</value></entry><entry><key>EVENT_DRIVEN</key><value>0</value></entry><entry><key>CRON_DRIVEN</key><value>1</value></entry></defaultConcurrentTasks><defaultSchedulingPeriod><entry><key>TIMER_DRIVEN</key><value>0 sec</value></entry><entry><key>CRON_DRIVEN</key><value>* * * * * ?</value></entry></defaultSchedulingPeriod><descriptors><e
 ntry><key>Regular Expression</key><value><defaultValue>(?s:^.*$)</defaultValue><description>The Regular Expression to search for in the FlowFile content</description><displayName>Regular Expression</displayName><dynamic>false</dynamic><name>Regular Expression</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Replacement Value</key><value><defaultValue>$1</defaultValue><description>The value to replace the regular expression with. Back-references to Regular Expression capturing groups are supported, but back-references that reference capturing groups that do not exist in the regular expression will be treated as literal value.</description><displayName>Replacement Value</displayName><dynamic>false</dynamic><name>Replacement Value</name><required>true</required><sensitive>false</sensitive><supportsEl>true</supportsEl></value></entry><entry><key>Character Set</key><value><defaultValue>UTF-8</defaultValue><description>The
  Character Set in which the file is encoded</description><displayName>Character Set</displayName><dynamic>false</dynamic><name>Character Set</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Maximum Buffer Size</key><value><defaultValue>1 MB</defaultValue><description>Specifies the maximum amount of data to buffer (per file or per line, depending on the Evaluation Mode) in order to apply the regular expressions. If 'Entire Text' (in Evaluation Mode) is selected and the FlowFile is larger than this value, the FlowFile will be routed to 'failure'. In 'Line-by-Line' Mode, if a single line is larger than this value, the FlowFile will be routed to 'failure'. A default value of 1 MB is provided, primarily for 'Entire Text' mode. In 'Line-by-Line' Mode, a value such as 8 KB or 16 KB is suggested. This value is ignored and the buffer is not used if 'Regular Expression' is set to '.*'</description><displayName>Maximum Buffer 
 Size</displayName><dynamic>false</dynamic><name>Maximum Buffer Size</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry><entry><key>Evaluation Mode</key><value><allowableValues><displayName>Line-by-Line</displayName><value>Line-by-Line</value></allowableValues><allowableValues><displayName>Entire text</displayName><value>Entire text</value></allowableValues><defaultValue>Entire text</defaultValue><description>Evaluate the 'Regular Expression' against each line (Line-by-Line) or buffer the entire file into memory (Entire Text) and then evaluate the 'Regular Expression'.</description><displayName>Evaluation Mode</displayName><dynamic>false</dynamic><name>Evaluation Mode</name><required>true</required><sensitive>false</sensitive><supportsEl>false</supportsEl></value></entry></descriptors><lossTolerant>false</lossTolerant><penaltyDuration>30 sec</penaltyDuration><properties><entry><key>Regular Expression</key><value>(?s:^.*$)</value><
 /entry><entry><key>Replacement Value</key><value>blah blah</value></entry><entry><key>Character Set</key><value>UTF-8</value></entry><entry><key>Maximum Buffer Size</key><value>1 MB</value></entry><entry><key>Evaluation Mode</key><value>Entire text</value></entry></properties><runDurationMillis>0</runDurationMillis><schedulingPeriod>0 sec</schedulingPeriod><schedulingStrategy>TIMER_DRIVEN</schedulingStrategy><yieldDuration>1 sec</yieldDuration></config><name>ReplaceText</name><relationships><autoTerminate>true</autoTerminate><description>FlowFiles that could not be updated are routed to this relationship</description><name>failure</name></relationships><relationships><autoTerminate>false</autoTerminate><description>FlowFiles that have been successfully updated are routed to this relationship, as well as FlowFiles whose content does not match the given Regular Expression</description><name>success</name></relationships><state>RUNNING</state><style/><supportsEventDriven>true</supports
 EventDriven><supportsParallelProcessing>true</supportsParallelProcessing><type>org.apache.nifi.processors.standard.ReplaceText</type></processors></snippet><timestamp>09/30/2015 09:10:38 EDT</timestamp></template>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
deleted file mode 100644
index a361920..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/pom.xml
+++ /dev/null
@@ -1,59 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-rabbitmq</artifactId>
-	<name>flink-connector-rabbitmq</name>
-
-	<packaging>jar</packaging>
-
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<rabbitmq.version>3.3.1</rabbitmq.version>
-	</properties>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.rabbitmq</groupId>
-			<artifactId>amqp-client</artifactId>
-			<version>${rabbitmq.version}</version>
-		</dependency>
-
-	</dependencies>
-
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
deleted file mode 100644
index fa729d6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-
-public class RMQSink<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(RMQSink.class);
-
-	private String QUEUE_NAME;
-	private String HOST_NAME;
-	private transient ConnectionFactory factory;
-	private transient Connection connection;
-	private transient Channel channel;
-	private SerializationSchema<IN, byte[]> schema;
-
-	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
-		this.HOST_NAME = HOST_NAME;
-		this.QUEUE_NAME = QUEUE_NAME;
-		this.schema = schema;
-	}
-
-	/**
-	 * Initializes the connection to RMQ.
-	 */
-	public void initializeConnection() {
-		factory = new ConnectionFactory();
-		factory.setHost(HOST_NAME);
-		try {
-			connection = factory.newConnection();
-			channel = connection.createChannel();
-			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to RMQ.
-	 * 
-	 * @param value
-	 *            The incoming data
-	 */
-	@Override
-	public void invoke(IN value) {
-		try {
-			byte[] msg = schema.serialize(value);
-
-			channel.basicPublish("", QUEUE_NAME, null, msg);
-
-		} catch (IOException e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Cannot send RMQ message {} at {}", QUEUE_NAME, HOST_NAME);
-			}
-		}
-
-	}
-
-	/**
-	 * Closes the connection.
-	 */
-	private void closeChannel() {
-		try {
-			channel.close();
-			connection.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
-					+ " at " + HOST_NAME, e);
-		}
-
-	}
-
-	@Override
-	public void open(Configuration config) {
-		initializeConnection();
-	}
-
-	@Override
-	public void close() {
-		closeChannel();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
deleted file mode 100644
index b18b8d8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.ConnectorSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-
-public class RMQSource<OUT> extends ConnectorSource<OUT> {
-	private static final long serialVersionUID = 1L;
-
-	private final String QUEUE_NAME;
-	private final String HOST_NAME;
-
-	private transient ConnectionFactory factory;
-	private transient Connection connection;
-	private transient Channel channel;
-	private transient QueueingConsumer consumer;
-	private transient QueueingConsumer.Delivery delivery;
-
-	private transient volatile boolean running;
-
-	public RMQSource(String HOST_NAME, String QUEUE_NAME,
-			DeserializationSchema<OUT> deserializationSchema) {
-		super(deserializationSchema);
-		this.HOST_NAME = HOST_NAME;
-		this.QUEUE_NAME = QUEUE_NAME;
-	}
-
-	/**
-	 * Initializes the connection to RMQ.
-	 */
-	private void initializeConnection() {
-		factory = new ConnectionFactory();
-		factory.setHost(HOST_NAME);
-		try {
-			connection = factory.newConnection();
-			channel = connection.createChannel();
-			channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-			consumer = new QueueingConsumer(channel);
-			channel.basicConsume(QUEUE_NAME, true, consumer);
-		} catch (IOException e) {
-			throw new RuntimeException("Cannot create RMQ connection with " + QUEUE_NAME + " at "
-					+ HOST_NAME, e);
-		}
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		initializeConnection();
-		running = true;
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		try {
-			connection.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing RMQ connection with " + QUEUE_NAME
-					+ " at " + HOST_NAME, e);
-		}
-	}
-
-	@Override
-	public void run(SourceContext<OUT> ctx) throws Exception {
-		while (running) {
-			delivery = consumer.nextDelivery();
-
-			OUT result = schema.deserialize(delivery.getBody());
-			if (schema.isEndOfStream(result)) {
-				break;
-			}
-
-			ctx.collect(result);
-		}
-	}
-
-	@Override
-	public void cancel() {
-		running = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
deleted file mode 100644
index 1f85862..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.rabbitmq;
-
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-
-public class RMQTopology {
-
-	public static void main(String[] args) throws Exception {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		@SuppressWarnings("unused")
-		DataStreamSink<String> dataStream1 = env.addSource(
-				new RMQSource<String>("localhost", "hello", new SimpleStringSchema())).print();
-
-		@SuppressWarnings("unused")
-		DataStreamSink<String> dataStream2 = env.fromElements("one", "two", "three", "four", "five",
-				"q").addSink(
-				new RMQSink<String>("localhost", "hello", new StringToByteSerializer()));
-
-		env.execute();
-	}
-
-	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public byte[] serialize(String element) {
-			return element.getBytes();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
deleted file mode 100644
index f1ef0f0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/pom.xml
+++ /dev/null
@@ -1,97 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-connectors-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-twitter</artifactId>
-	<name>flink-connector-twitter</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>com.twitter</groupId>
-			<artifactId>hbc-core</artifactId>
-			<version>2.2.0</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.sling</groupId>
-			<artifactId>org.apache.sling.commons.json</artifactId>
-			<version>2.0.6</version>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<configuration>
-							<artifactSet>
-								<includes combine.children="append">
-									<!-- We include all dependencies that transitively depend on guava -->
-									<include>com.twitter:hbc-core</include>
-									<include>com.twitter:joauth</include>
-								</includes>
-							</artifactSet>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
-							</transformers>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
deleted file mode 100644
index 0f16541..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
- * 
- * @param <IN>
- *            Type of the input elements.
- * @param <OUT>
- *            Type of the returned elements.
- */
-public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	// private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
-
-	/**
-	 * Get the value object associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public Object get(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).get("retValue");
-	}
-
-	/**
-	 * Get the boolean value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public boolean getBoolean(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getBoolean("retValue");
-	}
-
-	/**
-	 * Get the double value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public double getDouble(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getDouble("retValue");
-	}
-
-	/**
-	 * Get the int value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public int getInt(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getInt("retValue");
-	}
-
-	/**
-	 * Get the long value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public long getLong(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getLong("retValue");
-	}
-	
-	/**
-	 * Get the String value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public String getString(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-		
-		return parser.parse(field).getString("retValue");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
deleted file mode 100644
index c1eabbd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.StringTokenizer;
-
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-
-/**
- * A JSONParser contains a JSONObject and provides opportunity to access
- * embedded fields in JSON code.
- */
-public class JSONParser {
-
-	private JSONObject originalJO;
-	private String searchedfield;
-	private Object temp;
-
-	/**
-	 * Construct a JSONParser from a string. The string has to be a JSON code
-	 * from which we want to get a field.
-	 * 
-	 * @param jsonText
-	 *            A string which contains a JSON code. String representation of
-	 *            a JSON code.
-	 * @throws JSONException
-	 *             If there is a syntax error in the source string.
-	 */
-	public JSONParser(String jsonText) throws JSONException {
-		originalJO = new JSONObject(jsonText);
-	}
-
-	/**
-	 * 
-	 * Parse the JSON code passed to the constructor to find the given key.
-	 * 
-	 * @param key
-	 *            The key whose value is searched for.
-	 * @return A JSONObject which has only one field called "retValue" and the
-	 *         value associated to it is the searched value. The methods of
-	 *         JSONObject can be used to get the field value in a desired
-	 *         format.
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	public JSONObject parse(String key) throws JSONException {
-		initializeParser(key);
-		parsing();
-		return putResultInJSONObj();
-	}
-
-	/**
-	 * Prepare the fields of the class for the parsing
-	 * 
-	 * @param key
-	 *            The key whose value is searched for.
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	private void initializeParser(String key) throws JSONException {
-		searchedfield = key;
-		temp = new JSONObject(originalJO.toString());
-	}
-
-	/**
-	 * This function goes through the given field and calls the appropriate
-	 * functions to treat the units between the punctuation marks.
-	 * 
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	private void parsing() throws JSONException {
-		StringTokenizer st = new StringTokenizer(searchedfield, ".");
-		while (st.hasMoreTokens()) {
-			find(st.nextToken());
-		}
-	}
-
-	/**
-	 * Search for the next part of the field and update the state if it was
-	 * found.
-	 * 
-	 * @param nextToken
-	 *            The current part of the searched field.
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	private void find(String nextToken) throws JSONException {
-		if (endsWithBracket(nextToken)) {
-			treatAllBracket(nextToken);
-		} else {
-			temp = ((JSONObject) temp).get(nextToken);
-		}
-	}
-
-	/**
-	 * Determine whether the given string ends with a closing square bracket ']'
-	 * 
-	 * @param nextToken
-	 *            The current part of the searched field.
-	 * @return True if the given string ends with a closing square bracket ']'
-	 *         and false otherwise.
-	 */
-	private boolean endsWithBracket(String nextToken) {
-		return nextToken.substring(nextToken.length() - 1).endsWith("]");
-	}
-
-	/**
-	 * Handle (multidimensional) arrays. Treat the square bracket pairs one
-	 * after the other if necessary.
-	 * 
-	 * @param nextToken
-	 *            The current part of the searched field.
-	 * @throws JSONException
-	 *             If the searched element is not found.
-	 */
-	private void treatAllBracket(String nextToken) throws JSONException {
-		List<String> list = Arrays.asList(nextToken.split("\\["));
-		ListIterator<String> iter = list.listIterator();
-
-		temp = ((JSONObject) temp).get(iter.next());
-
-		while (iter.hasNext()) {
-			int index = Integer.parseInt(cutBracket(iter.next()));
-			temp = ((JSONArray) temp).get(index);
-		}
-	}
-
-	/**
-	 * Remove the last character of the string.
-	 * 
-	 * @param string
-	 *            String to modify.
-	 * @return The given string without the last character.
-	 */
-	private String cutBracket(String string) {
-		return string.substring(0, string.length() - 1);
-	}
-
-	/**
-	 * Save the result of the search into a JSONObject.
-	 * 
-	 * @return A special JSONObject which contain only one key. The value
-	 *         associated to this key is the result of the search.
-	 * @throws JSONException
-	 *             If there is a problem creating the JSONObject. (e.g. invalid
-	 *             syntax)
-	 */
-	private JSONObject putResultInJSONObj() throws JSONException {
-		JSONObject jo = new JSONObject();
-		jo.put("retValue", temp);
-		return jo;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
deleted file mode 100644
index 8dd4458..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSource.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.hbc.core.endpoint.Location;
-import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
-import com.twitter.hbc.httpclient.auth.Authentication;
-
-/**
- * 
- * An extension of {@link TwitterSource} by filter parameters. This extension
- * enables to filter the twitter stream by user defined parameters.
- */
-public class TwitterFilterSource extends TwitterSource {
-
-	private static final Logger LOG = LoggerFactory
-			.getLogger(TwitterFilterSource.class);
-
-	private static final long serialVersionUID = 1L;
-
-	private List<String> trackTerms = new LinkedList<String>();
-
-	private List<String> languages = new LinkedList<String>();
-
-	private List<Long> followings = new LinkedList<Long>();
-
-	private List<Location> locations = new LinkedList<Location>();
-
-	private Map<String, String> queryParameters = new HashMap<String, String>();
-
-	private Map<String, String> postParameters = new HashMap<String, String>();
-
-	public TwitterFilterSource(String authPath) {
-		super(authPath);
-	}
-
-	@Override
-	protected void initializeConnection() {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Initializing Twitter Streaming API connection");
-		}
-		queue = new LinkedBlockingQueue<String>(queueSize);
-
-		StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint();
-		configEndpoint(endpoint);
-		endpoint.stallWarnings(false);
-
-		Authentication auth = authenticate();
-
-		initializeClient(endpoint, auth);
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Twitter Streaming API connection established successfully");
-		}
-	}
-
-	/**
-	 * This function configures the streaming endpoint
-	 * 
-	 * @param endpoint
-	 *            The streaming endpoint
-	 */
-	private void configEndpoint(StatusesFilterEndpoint endpoint) {
-		if (!trackTerms.isEmpty()) {
-			endpoint.trackTerms(trackTerms);
-		}
-		if (!languages.isEmpty()) {
-			endpoint.languages(languages);
-		}
-		if (!followings.isEmpty()) {
-			endpoint.followings(followings);
-		}
-		if (!locations.isEmpty()) {
-			endpoint.locations(locations);
-		}
-		if (!queryParameters.isEmpty()) {
-			for (Entry<String, String> entry : queryParameters.entrySet()) {
-				endpoint.addQueryParameter(entry.getKey(), entry.getValue());
-			}
-		}
-		if (!postParameters.isEmpty()) {
-			for (Entry<String, String> entry : postParameters.entrySet()) {
-				endpoint.addPostParameter(entry.getKey(), entry.getValue());
-			}
-		}
-	}
-
-	/**
-	 * This function sets which term to track.
-	 * 
-	 * @param term
-	 *            The term to track.
-	 */
-	public void trackTerm(String term) {
-		this.trackTerms.add(term);
-	}
-
-	/**
-	 * This function sets which terms to track.
-	 * 
-	 * @param terms
-	 *            The terms to track.
-	 */
-	public void trackTerms(Collection<String> terms) {
-		this.trackTerms.addAll(terms);
-	}
-
-	/**
-	 * This function tells which terms are tracked.
-	 */
-	public List<String> getTrackTerms() {
-		return this.trackTerms;
-	}
-
-	/**
-	 * This function sets which language to filter.
-	 * 
-	 * @param language
-	 *            The language to filter.
-	 */
-	public void filterLanguage(String language) {
-		this.languages.add(language);
-	}
-
-	/**
-	 * This function sets which languages to filter.
-	 * 
-	 * @param languages
-	 *            The languages to filter.
-	 */
-	public void filterLanguages(Collection<String> languages) {
-		this.languages.addAll(languages);
-	}
-
-	/**
-	 * This function tells which languages are filtered.
-	 */
-	public List<String> getLanguages() {
-		return this.languages;
-	}
-
-	/**
-	 * This function sets which user to follow.
-	 * 
-	 * @param userID
-	 *            The ID of the user to follow.
-	 */
-	public void filterFollowings(Long userID) {
-		this.followings.add(userID);
-	}
-
-	/**
-	 * This function sets which users to follow.
-	 * 
-	 * @param userIDs
-	 *            The IDs of the users to follow.
-	 */
-	public void filterFollowings(Collection<Long> userIDs) {
-		this.followings.addAll(userIDs);
-	}
-
-	/**
-	 * This function tells which users are followed.
-	 */
-	public List<Long> getFollowings() {
-		return this.followings;
-	}
-
-	/**
-	 * This function sets which location to filter.
-	 * 
-	 * @param location
-	 *            The location to filter.
-	 */
-	public void filterLocation(Location location) {
-		this.locations.add(location);
-	}
-
-	/**
-	 * This function sets which locations to filter.
-	 * 
-	 * @param locations
-	 *            The locations to filter.
-	 */
-	public void filterLocations(Collection<Location> locations) {
-		this.locations.addAll(locations);
-	}
-
-	/**
-	 * This function tells which locations are filtered.
-	 */
-	public List<Location> getLocations() {
-		return this.locations;
-	}
-
-	/**
-	 * This function sets a query parameter.
-	 * 
-	 * @param parameter
-	 *            The name of the query parameter.
-	 * @param value
-	 *            The value of the query parameter.
-	 */
-	public void addQueryParameter(String parameter, String value) {
-		this.queryParameters.put(parameter, value);
-	}
-
-	/**
-	 * This function sets query parameters.
-	 * 
-	 * @param queryParameters
-	 *            The query parameters for the endpoint.
-	 */
-	public void addQueryParameters(Map<String, String> queryParameters) {
-		this.queryParameters.putAll(queryParameters);
-	}
-
-	/**
-	 * This function tells which query parameters are used by the endpoint.
-	 */
-	public Map<String, String> getQueryParameters() {
-		return this.queryParameters;
-	}
-
-	/**
-	 * This function sets a post parameter.
-	 * 
-	 * @param parameter
-	 *            The name of the post parameter.
-	 * @param value
-	 *            The value of the post parameter.
-	 */
-	public void addPostParameter(String parameter, String value) {
-		this.postParameters.put(parameter, value);
-	}
-
-	/**
-	 * This function sets post parameters.
-	 * 
-	 * @param postParameters
-	 *              The post parameters for the endpoint.
-	 */
-	public void addPostParameters(Map<String, String> postParameters) {
-		this.postParameters.putAll(postParameters);
-	}
-
-	/**
-	 * This function tells which post parameters are used by the endpoint.
-	 */
-	public Map<String, String> postParameters() {
-		return this.postParameters;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
deleted file mode 100644
index 43cb179..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterFilterSourceExample.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.util.Collector;
-
-/**
- * This is an example how to use TwitterFilterSource. Before executing the
- * example you have to define the access keys of twitter.properties in the
- * resource folder. The access keys can be found in your twitter account.
- */
-public class TwitterFilterSourceExample {
-
-	/**
-	 * path to the twitter properties
-	 */
-	private static final String PATH_TO_AUTH_FILE = "/twitter.properties";
-
-	public static void main(String[] args) {
-		final StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.getExecutionEnvironment();
-
-		TwitterFilterSource twitterSource = new TwitterFilterSource(
-				TwitterFilterSourceExample.class.getResource(PATH_TO_AUTH_FILE)
-						.getFile());
-
-		twitterSource.trackTerm("obama");
-		twitterSource.filterLanguage("en");
-
-		DataStream<String> streamSource = env.addSource(twitterSource).flatMap(
-				new JSONParseFlatMap<String, String>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public void flatMap(String s, Collector<String> c)
-							throws Exception {
-						c.collect(s);
-					}
-				});
-
-		streamSource.print();
-
-		try {
-			env.execute("Twitter Streaming Test");
-		} catch (Exception e) {
-			// TODO Auto-generated catch block
-			e.printStackTrace();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
deleted file mode 100644
index bad0f8c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.hbc.ClientBuilder;
-import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.DefaultStreamingEndpoint;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
-import com.twitter.hbc.core.processor.StringDelimitedProcessor;
-import com.twitter.hbc.httpclient.BasicClient;
-import com.twitter.hbc.httpclient.auth.Authentication;
-import com.twitter.hbc.httpclient.auth.OAuth1;
-
-/**
- * Implementation of {@link SourceFunction} specialized to emit tweets from
- * Twitter. This is not a parallel source because the Twitter API only allows
- * two concurrent connections.
- */
-public class TwitterSource extends RichSourceFunction<String> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(TwitterSource.class);
-
-	private static final long serialVersionUID = 1L;
-	private String authPath;
-	protected transient BlockingQueue<String> queue;
-	protected int queueSize = 10000;
-	private transient BasicClient client;
-	private int waitSec = 5;
-
-	private int maxNumberOfTweets;
-	private int currentNumberOfTweets;
-
-	private transient volatile boolean isRunning;
-
-	/**
-	 * Create {@link TwitterSource} for streaming
-	 * 
-	 * @param authPath
-	 *            Location of the properties file containing the required
-	 *            authentication information.
-	 */
-	public TwitterSource(String authPath) {
-		this.authPath = authPath;
-		maxNumberOfTweets = -1;
-	}
-
-	/**
-	 * Create {@link TwitterSource} to collect finite number of tweets
-	 * 
-	 * @param authPath
-	 *            Location of the properties file containing the required
-	 *            authentication information.
-	 * @param numberOfTweets
-	 * 
-	 */
-	public TwitterSource(String authPath, int numberOfTweets) {
-		this.authPath = authPath;
-		this.maxNumberOfTweets = numberOfTweets;
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		initializeConnection();
-		currentNumberOfTweets = 0;
-		isRunning = true;
-	}
-
-	/**
-	 * Initialize Hosebird Client to be able to consume Twitter's Streaming API
-	 */
-	protected void initializeConnection() {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Initializing Twitter Streaming API connection");
-		}
-
-		queue = new LinkedBlockingQueue<String>(queueSize);
-
-		StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
-		endpoint.stallWarnings(false);
-
-		Authentication auth = authenticate();
-
-		initializeClient(endpoint, auth);
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Twitter Streaming API connection established successfully");
-		}
-	}
-
-	protected OAuth1 authenticate() {
-
-		Properties authenticationProperties = loadAuthenticationProperties();
-		
-		return new OAuth1(authenticationProperties.getProperty("consumerKey"),
-				authenticationProperties.getProperty("consumerSecret"),
-				authenticationProperties.getProperty("token"),
-				authenticationProperties.getProperty("secret"));
-	}
-
-	/**
-	 * Reads the given properties file for the authentication data.
-	 * 
-	 * @return the authentication data.
-	 */
-	private Properties loadAuthenticationProperties() {
-		
-		Properties properties = new Properties();
-		try {
-			InputStream input = new FileInputStream(authPath);
-			properties.load(input);
-			input.close();
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot open .properties file: " + authPath, e);
-		}
-		return properties;
-	}
-
-	protected void initializeClient(DefaultStreamingEndpoint endpoint, Authentication auth) {
-
-		client = new ClientBuilder().name("twitterSourceClient").hosts(Constants.STREAM_HOST)
-				.endpoint(endpoint).authentication(auth)
-				.processor(new StringDelimitedProcessor(queue)).build();
-		
-		client.connect();
-	}
-
-	@Override
-	public void close() {
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Initiating connection close");
-		}
-
-		if (client != null) {
-			client.stop();
-		}
-
-		if (LOG.isInfoEnabled()) {
-			LOG.info("Connection closed successfully");
-		}
-	}
-
-	/**
-	 * Get the size of the queue in which the tweets are contained temporarily.
-	 * 
-	 * @return the size of the queue in which the tweets are contained
-	 *         temporarily
-	 */
-	public int getQueueSize() {
-		return queueSize;
-	}
-
-	/**
-	 * Set the size of the queue in which the tweets are contained temporarily.
-	 * 
-	 * @param queueSize
-	 *            The desired value.
-	 */
-	public void setQueueSize(int queueSize) {
-		this.queueSize = queueSize;
-	}
-
-	/**
-	 * This function tells how long TwitterSource waits for the tweets.
-	 * 
-	 * @return Number of second.
-	 */
-	public int getWaitSec() {
-		return waitSec;
-	}
-
-	/**
-	 * This function sets how long TwitterSource should wait for the tweets.
-	 * 
-	 * @param waitSec
-	 *            The desired value.
-	 */
-	public void setWaitSec(int waitSec) {
-		this.waitSec = waitSec;
-	}
-
-	@Override
-	public void run(SourceContext<String> ctx) throws Exception {
-		while (isRunning) {
-			if (client.isDone()) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Client connection closed unexpectedly: {}", client.getExitEvent()
-							.getMessage());
-				}
-				break;
-			}
-
-			ctx.collect(queue.take());
-
-			if (maxNumberOfTweets != -1 && currentNumberOfTweets >= maxNumberOfTweets) {
-				break;
-			}
-		}
-	}
-
-	@Override
-	public void cancel() {
-		isRunning = false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
deleted file mode 100644
index a80c32a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterStreaming.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TwitterStreaming {
-
-	private static final int PARALLELISM = 1;
-	private static final int SOURCE_PARALLELISM = 1;
-	private static final int NUMBEROFTWEETS = 100;
-
-	private static final Logger LOG = LoggerFactory.getLogger(TwitterStreaming.class);
-
-	public static class TwitterSink implements SinkFunction<Tuple5<Long, Integer, String, String, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void invoke(Tuple5<Long, Integer, String, String, String> tuple) {
-			System.out.println("ID: " + tuple.f0 + " int: " + tuple.f1 + " LANGUAGE: " + tuple.f2);
-			System.out.println("NAME: " + tuple.f4);
-			System.out.println("TEXT: " + tuple.f3);
-			System.out.println("");
-		}
-
-	}
-
-	public static class SelectDataFlatMap extends
-			JSONParseFlatMap<String, Tuple5<Long, Integer, String, String, String>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(String value, Collector<Tuple5<Long, Integer, String, String, String>> out)
-				throws Exception {
-			try {
-				out.collect(new Tuple5<Long, Integer, String, String, String>(
-						getLong(value, "id"),
-						getInt(value, "entities.hashtags[0].indices[1]"),
-						getString(value, "lang"),
-						getString(value, "text"),
-						getString(value, "user.name")));
-			} catch (JSONException e) {
-				if (LOG.isErrorEnabled()) {
-					LOG.error("Field not found");
-				}
-			}
-		}
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		String path = new String();
-
-		if (args != null && args.length == 1) {
-			path = args[0];
-		} else {
-			System.err.println("USAGE:\nTwitterStreaming <pathToPropertiesFile>");
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment
-				.createLocalEnvironment(PARALLELISM);
-
-		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
-				.setParallelism(SOURCE_PARALLELISM);
-
-		DataStream<Tuple5<Long, Integer, String, String, String>> selectedDataStream = streamSource
-				.flatMap(new SelectDataFlatMap());
-
-		selectedDataStream.addSink(new TwitterSink());
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
deleted file mode 100644
index b1fc92c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*    http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.streaming.connectors.twitter;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
-import org.apache.flink.util.Collector;
-import org.apache.sling.commons.json.JSONException;
-
-/**
-* This program demonstrate the use of TwitterSource.
-* Its aim is to count the frequency of the languages of tweets
-*/
-public class TwitterTopology {
-
-	private static final int NUMBEROFTWEETS = 100;
-
-	/**
-	 * FlatMapFunction to determine the language of tweets if possible
-	 */
-	public static class SelectLanguageFlatMap extends
-			JSONParseFlatMap<String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * Select the language from the incoming JSON text
-		 */
-		@Override
-		public void flatMap(String value, Collector<String> out) throws Exception {
-			try{
-				out.collect(getString(value, "lang"));
-			}
-			catch (JSONException e){
-				out.collect("");
-			}
-		}
-
-	}
-
-	public static void main(String[] args) throws Exception {
-
-		String path = new String();
-
-		if (args != null && args.length == 1) {
-			path = args[0];
-		} else {
-			System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
-
-
-		DataStream<Tuple2<String, Integer>> dataStream = streamSource
-				.flatMap(new SelectLanguageFlatMap())
-				.map(new MapFunction<String, Tuple2<String, Integer>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<String, Integer> map(String value) throws Exception {
-						return new Tuple2<String, Integer>(value, 1);
-					}
-				})
-				.keyBy(0)
-				.sum(1);
-
-		dataStream.print();
-
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
deleted file mode 100644
index 1ca4143..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/resources/twitter.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-secret=***
-consumerSecret=***
-token=***-***
-consumerKey=***

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
deleted file mode 100644
index b1d4115..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.Arrays;
-import java.util.Collection;
-
-import org.apache.flink.streaming.connectors.json.JSONParser;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
-public class JSONParserTest {
-
-	private String jsonText;
-	private String searchedField;
-
-	public JSONParserTest(String text, String field) {
-		jsonText = text;
-		searchedField = field;
-	}
-
-	@Parameters
-	public static Collection<Object[]> initParameterList() {
-
-		Object[][] parameterList = new Object[][] { 
-				{ "{\"key\":\"value\"}", 							"key" },
-				{ "{\"key\":[\"value\"]}", 							"key[0]" },
-				{ "{\"key\":[{\"key\":\"value\"}]}", 				"key[0].key" },
-				{ "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", 	"key[0].key[0].key"},
-				{ "{\"key\":[1,[{\"key\":\"value\"}]]}", 			"key[1][0].key" },
-				{ "{\"key\":[1,[[\"key\",2,\"value\"]]]}", 			"key[1][0][2]" },
-				{ "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
-				};
-
-		return Arrays.asList(parameterList);
-	}
-
-	@Test
-	public void test() {
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-			String expected = "{\"retValue\":\"value\"}";
-
-			assertTrue(expected.equals(jo.toString()));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
deleted file mode 100644
index 8851086..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.flink.streaming.connectors.json.JSONParser;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-import org.junit.Test;
-
-
-public class JSONParserTest2 {
-	
-	@Test
-	public void testGetBooleanFunction() {
-		String jsonText = "{\"key\":true}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertTrue(jo.getBoolean("retValue"));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-	
-	@Test
-	public void testGetDoubleFunction() {
-		double expected = 12345.12345;
-		String jsonText = "{\"key\":" + expected + "}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertEquals(expected,jo.getDouble("retValue"),0.000001);
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-	
-	@Test
-	public void testGetIntFunction() {
-		int expected = 15;
-		String jsonText = "{\"key\":" + expected + "}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertEquals(expected,jo.getInt("retValue"));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-
-	@Test
-	public void testGetLongFunction() {
-		long expected = 111111111111L;
-		String jsonText = "{\"key\":" + expected + "}";
-		String searchedField = "key";
-		try {
-			JSONParser parser = new JSONParser(jsonText);
-			JSONObject jo = parser.parse(searchedField);
-
-			assertEquals(expected,jo.getLong("retValue"));
-		} 
-		catch (JSONException e) {
-			fail();
-		}
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
deleted file mode 100644
index 9ede613..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
deleted file mode 100644
index 45b3b92..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.streaming" level="WARN"/>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
deleted file mode 100644
index 822ca26..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ /dev/null
@@ -1,66 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-streaming-parent</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-streaming-connectors-parent</artifactId>
-	<name>flink-streaming-connectors</name>
-
-	<packaging>pom</packaging>
-
-	<modules>
-		<module>flink-connector-flume</module>
-		<module>flink-connector-kafka</module>
-		<module>flink-connector-elasticsearch</module>
-		<module>flink-connector-rabbitmq</module>
-		<module>flink-connector-twitter</module>
-		<module>flink-connector-nifi</module>
-	</modules>
-
-	<!-- See main pom.xml for explanation of profiles -->
-	<profiles>
-		<profile>
-			<id>hadoop-2</id>
-			<activation>
-				<property>
-					<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
-					<!--hadoop2--><name>!hadoop.profile</name>
-				</property>
-			</activation>
-			<modules>
-				<!-- Include the flink-fs-tests project only for HD2.
-				 	The HDFS minicluster interfaces changed between the two versions.
-				 -->
-				<module>flink-connector-filesystem</module>
-			</modules>
-		</profile>
-	</profiles>
-
-</project>