You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/28 15:36:29 UTC

[3/7] flink git commit: [FLINK-1874] [streaming] Connectors separated into individual projects

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
new file mode 100644
index 0000000..4bc6df0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/main/java/org/apache/flink/streaming/connectors/twitter/TwitterTopology.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.twitter;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.json.JSONParseFlatMap;
+import org.apache.flink.util.Collector;
+import org.apache.sling.commons.json.JSONException;
+
+/**
+ * This program demonstrate the use of TwitterSource. 
+ * Its aim is to count the frequency of the languages of tweets
+ */
+public class TwitterTopology {
+
+	private static final int NUMBEROFTWEETS = 100;
+	
+	/**
+	 * FlatMapFunction to determine the language of tweets if possible 
+	 */
+	public static class SelectLanguageFlatMap extends
+			JSONParseFlatMap<String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		/**
+		 * Select the language from the incoming JSON text
+		 */
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			try{
+				out.collect(getString(value, "lang"));
+			}
+			catch (JSONException e){
+				out.collect("");
+			}
+		}
+
+	}
+
+	public static void main(String[] args) throws Exception {
+
+		String path = new String();
+
+		if (args != null && args.length == 1) {
+			path = args[0];
+		} else {
+			System.err.println("USAGE:\nTwitterLocal <pathToPropertiesFile>");
+			return;
+		}
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
+
+
+		DataStream<Tuple2<String, Integer>> dataStream = streamSource
+				.flatMap(new SelectLanguageFlatMap())
+				.map(new MapFunction<String, Tuple2<String, Integer>>() {
+					private static final long serialVersionUID = 1L;
+					
+					@Override
+					public Tuple2<String, Integer> map(String value) throws Exception {
+						return new Tuple2<String, Integer>(value, 1);
+					}
+				})
+				.groupBy(0)
+				.sum(1);
+
+		dataStream.print();
+
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
new file mode 100644
index 0000000..b1d4115
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class JSONParserTest {
+
+	private String jsonText;
+	private String searchedField;
+
+	public JSONParserTest(String text, String field) {
+		jsonText = text;
+		searchedField = field;
+	}
+
+	@Parameters
+	public static Collection<Object[]> initParameterList() {
+
+		Object[][] parameterList = new Object[][] { 
+				{ "{\"key\":\"value\"}", 							"key" },
+				{ "{\"key\":[\"value\"]}", 							"key[0]" },
+				{ "{\"key\":[{\"key\":\"value\"}]}", 				"key[0].key" },
+				{ "{\"key\":[{\"key\":[{\"key\":\"value\"}]}]}", 	"key[0].key[0].key"},
+				{ "{\"key\":[1,[{\"key\":\"value\"}]]}", 			"key[1][0].key" },
+				{ "{\"key\":[1,[[\"key\",2,\"value\"]]]}", 			"key[1][0][2]" },
+				{ "{\"key\":{\"key\":{\"otherKey\":\"wrongValue\",\"key\":\"value\"},\"otherKey\":\"wrongValue\"},\"otherKey\":\"wrongValue\"}" , "key.key.key"}
+				};
+
+		return Arrays.asList(parameterList);
+	}
+
+	@Test
+	public void test() {
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+			String expected = "{\"retValue\":\"value\"}";
+
+			assertTrue(expected.equals(jo.toString()));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
new file mode 100644
index 0000000..8851086
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/java/org/apache/flink/streaming/connectors/json/JSONParserTest2.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.json;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.streaming.connectors.json.JSONParser;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.junit.Test;
+
+
+public class JSONParserTest2 {
+	
+	@Test
+	public void testGetBooleanFunction() {
+		String jsonText = "{\"key\":true}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertTrue(jo.getBoolean("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetDoubleFunction() {
+		double expected = 12345.12345;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getDouble("retValue"),0.000001);
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+	@Test
+	public void testGetIntFunction() {
+		int expected = 15;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getInt("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+
+	@Test
+	public void testGetLongFunction() {
+		long expected = 111111111111L;
+		String jsonText = "{\"key\":" + expected + "}";
+		String searchedField = "key";
+		try {
+			JSONParser parser = new JSONParser(jsonText);
+			JSONObject jo = parser.parse(searchedField);
+
+			assertEquals(expected,jo.getLong("retValue"));
+		} 
+		catch (JSONException e) {
+			fail();
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..9ede613
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/log4j-test.properties
@@ -0,0 +1,27 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..45b3b92
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-twitter/src/test/resources/logback-test.xml
@@ -0,0 +1,30 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <logger name="org.apache.flink.streaming" level="WARN"/>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 1ecf6c1..4b82d09 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -30,215 +30,16 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-streaming-connectors</artifactId>
+	<artifactId>flink-streaming-connectors-parent</artifactId>
 	<name>flink-streaming-connectors</name>
 
-	<packaging>jar</packaging>
+	<packaging>pom</packaging>
 
-	<!-- Allow users to pass custom connector versions -->
-	<properties>
-		<kafka.version>0.8.2.0</kafka.version>
-		<rabbitmq.version>3.3.1</rabbitmq.version>
-		<flume-ng.version>1.5.0</flume-ng.version>
-	</properties>
+	<modules>
+		<module>flink-connector-flume</module>
+		<module>flink-connector-kafka</module>
+		<module>flink-connector-rabbitmq</module>
+		<module>flink-connector-twitter</module>
+	</modules>
 
-	<dependencies>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-core</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.kafka</groupId>
-			<artifactId>kafka_${scala.binary.version}</artifactId>
-			<version>${kafka.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>com.sun.jmx</groupId>
-					<artifactId>jmxri</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.sun.jdmk</groupId>
-					<artifactId>jmxtools</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-simple</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>net.sf.jopt-simple</groupId>
-					<artifactId>jopt-simple</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.scala-lang</groupId>
-					<artifactId>scala-reflect</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.scala-lang</groupId>
-					<artifactId>scala-compiler</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.yammer.metrics</groupId>
-					<artifactId>metrics-annotation</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.curator</groupId>
-			<artifactId>curator-test</artifactId>
-			<version>2.7.1</version>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>com.rabbitmq</groupId>
-			<artifactId>amqp-client</artifactId>
-			<version>${rabbitmq.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flume</groupId>
-			<artifactId>flume-ng-core</artifactId>
-			<version>${flume-ng.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.slf4j</groupId>
-					<artifactId>slf4j-log4j12</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>log4j</groupId>
-					<artifactId>log4j</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-io</groupId>
-					<artifactId>commons-io</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-codec</groupId>
-					<artifactId>commons-codec</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-cli</groupId>
-					<artifactId>commons-cli</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-lang</groupId>
-					<artifactId>commons-lang</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.avro</groupId>
-					<artifactId>avro</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-core-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.codehaus.jackson</groupId>
-					<artifactId>jackson-mapper-asl</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.thoughtworks.paranamer</groupId>
-					<artifactId>paranamer</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.xerial.snappy</groupId>
-					<artifactId>snappy-java</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.tukaani</groupId>
-					<artifactId>xz</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.velocity</groupId>
-					<artifactId>velocity</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>commons-collections</groupId>
-					<artifactId>commons-collections</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>servlet-api</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty-util</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.mortbay.jetty</groupId>
-					<artifactId>jetty</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.google.code.gson</groupId>
-					<artifactId>gson</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.thrift</groupId>
-					<artifactId>libthrift</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>com.twitter</groupId>
-			<artifactId>hbc-core</artifactId>
-			<version>2.2.0</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.sling</groupId>
-			<artifactId>org.apache.sling.commons.json</artifactId>
-			<version>2.0.6</version>
-		</dependency>
-
-	</dependencies>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-jar-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>test-jar</goal>
-						</goals>
-					</execution>
-				</executions>
-			</plugin>
-			<plugin>
-				<!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>2.3</version>
-				<executions>
-					<execution>
-						<id>shade-flink</id>
-						<configuration>
-							<artifactSet>
-								<includes combine.children="append">
-									<!-- We include all dependencies that transitively depend on guava -->
-									<include>org.apache.flume:*</include>
-									<include>com.twitter:hbc-core</include>
-									<include>com.twitter:joauth</include>
-								</includes>
-							</artifactSet>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
deleted file mode 100644
index fde4cdf..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-
-public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>{
-
-	private static final long serialVersionUID = 1L;
-	protected DeserializationSchema<OUT> schema;
-
-	public ConnectorSource(DeserializationSchema<OUT> schema) {
-		this.schema = schema;
-	}
-
-	@Override
-	public TypeInformation<OUT> getProducedType() {
-		return schema.getProducedType();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
deleted file mode 100644
index 50f5770..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.flume;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flume.Event;
-import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
-import org.apache.flume.api.RpcClient;
-import org.apache.flume.api.RpcClientFactory;
-import org.apache.flume.event.EventBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FlumeSink<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
-
-	private transient FlinkRpcClientFacade client;
-	boolean initDone = false;
-	String host;
-	int port;
-	SerializationSchema<IN, byte[]> schema;
-
-	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
-		this.host = host;
-		this.port = port;
-		this.schema = schema;
-	}
-
-	/**
-	 * Receives tuples from the Apache Flink {@link DataStream} and forwards
-	 * them to Apache Flume.
-	 * 
-	 * @param value
-	 *            The tuple arriving from the datastream
-	 */
-	@Override
-	public void invoke(IN value) {
-
-		byte[] data = schema.serialize(value);
-		client.sendDataToFlume(data);
-
-	}
-
-	private class FlinkRpcClientFacade {
-		private RpcClient client;
-		private String hostname;
-		private int port;
-
-		/**
-		 * Initializes the connection to Apache Flume.
-		 * 
-		 * @param hostname
-		 *            The host
-		 * @param port
-		 *            The port.
-		 */
-		public void init(String hostname, int port) {
-			// Setup the RPC connection
-			this.hostname = hostname;
-			this.port = port;
-			int initCounter = 0;
-			while (true) {
-				if (initCounter >= 90) {
-					throw new RuntimeException("Cannot establish connection with" + port + " at "
-							+ host);
-				}
-				try {
-					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
-				} catch (FlumeException e) {
-					// Wait one second if the connection failed before the next
-					// try
-					try {
-						Thread.sleep(1000);
-					} catch (InterruptedException e1) {
-						if (LOG.isErrorEnabled()) {
-							LOG.error("Interrupted while trying to connect {} at {}", port, host);
-						}
-					}
-				}
-				if (client != null) {
-					break;
-				}
-				initCounter++;
-			}
-			initDone = true;
-		}
-
-		/**
-		 * Sends byte arrays as {@link Event} series to Apache Flume.
-		 * 
-		 * @param data
-		 *            The byte array to send to Apache FLume
-		 */
-		public void sendDataToFlume(byte[] data) {
-			Event event = EventBuilder.withBody(data);
-
-			try {
-				client.append(event);
-
-			} catch (EventDeliveryException e) {
-				// clean up and recreate the client
-				client.close();
-				client = null;
-				client = RpcClientFactory.getDefaultInstance(hostname, port);
-			}
-		}
-
-	}
-
-	@Override
-	public void close() {
-		client.client.close();
-	}
-
-	@Override
-	public void open(Configuration config) {
-		client = new FlinkRpcClientFacade();
-		client.init(host, port);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
deleted file mode 100644
index bb9ce38..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ /dev/null
@@ -1,149 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import java.util.List;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.connectors.ConnectorSource;
-//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-//import org.apache.flink.util.Collector;
-//import org.apache.flume.Context;
-//import org.apache.flume.channel.ChannelProcessor;
-//import org.apache.flume.source.AvroSource;
-//import org.apache.flume.source.avro.AvroFlumeEvent;
-//import org.apache.flume.source.avro.Status;
-//
-//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
-//	private static final long serialVersionUID = 1L;
-//
-//	String host;
-//	String port;
-//	volatile boolean finished = false;
-//
-//	private volatile boolean isRunning = false;
-//
-//	FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
-//		super(deserializationSchema);
-//		this.host = host;
-//		this.port = Integer.toString(port);
-//	}
-//
-//	public class MyAvroSource extends AvroSource {
-//		Collector<OUT> output;
-//
-//		/**
-//		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
-//		 * {@link DataStream}.
-//		 *
-//		 * @param avroEvent
-//		 *            The event that should be sent to the dataStream
-//		 * @return A {@link Status}.OK message if sending the event was
-//		 *         successful.
-//		 */
-//		@Override
-//		public Status append(AvroFlumeEvent avroEvent) {
-//			collect(avroEvent);
-//			return Status.OK;
-//		}
-//
-//		/**
-//		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
-//		 * {@link DataStream}.
-//		 *
-//		 * @param events
-//		 *            The events that is sent to the dataStream
-//		 * @return A Status.OK message if sending the events was successful.
-//		 */
-//		@Override
-//		public Status appendBatch(List<AvroFlumeEvent> events) {
-//			for (AvroFlumeEvent avroEvent : events) {
-//				collect(avroEvent);
-//			}
-//
-//			return Status.OK;
-//		}
-//
-//		/**
-//		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
-//		 * {@link DataStream}.
-//		 *
-//		 * @param avroEvent
-//		 *            The event that is sent to the dataStream
-//		 */
-//		private void collect(AvroFlumeEvent avroEvent) {
-//			byte[] b = avroEvent.getBody().array();
-//			OUT out = FlumeSource.this.schema.deserialize(b);
-//
-//			if (schema.isEndOfStream(out)) {
-//				FlumeSource.this.finished = true;
-//				this.stop();
-//				FlumeSource.this.notifyAll();
-//			} else {
-//				output.collect(out);
-//			}
-//
-//		}
-//
-//	}
-//
-//	MyAvroSource avroSource;
-//
-//	/**
-//	 * Configures the AvroSource. Also sets the output so the application can
-//	 * use it from outside of the invoke function.
-//	 *
-//	 * @param output
-//	 *            The output used in the invoke function
-//	 */
-//	public void configureAvroSource(Collector<OUT> output) {
-//
-//		avroSource = new MyAvroSource();
-//		avroSource.output = output;
-//		Context context = new Context();
-//		context.put("port", port);
-//		context.put("bind", host);
-//		avroSource.configure(context);
-//		// An instance of a ChannelProcessor is required for configuring the
-//		// avroSource although it will not be used in this case.
-//		ChannelProcessor cp = new ChannelProcessor(null);
-//		avroSource.setChannelProcessor(cp);
-//	}
-//
-//	/**
-//	 * Configures the AvroSource and runs until the user calls a close function.
-//	 *
-//	 * @param output
-//	 *            The Collector for sending data to the datastream
-//	 */
-//	@Override
-//	public void run(Collector<OUT> output) throws Exception {
-//		isRunning = true;
-//		configureAvroSource(output);
-//		avroSource.start();
-//		while (!finished && isRunning) {
-//			this.wait();
-//		}
-//	}
-//
-//	@Override
-//	public void cancel() {
-//		isRunning = false;
-//	}
-//
-//}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
deleted file mode 100644
index f630bce..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ /dev/null
@@ -1,49 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one or more
-// * contributor license agreements.  See the NOTICE file distributed with
-// * this work for additional information regarding copyright ownership.
-// * The ASF licenses this file to You under the Apache License, Version 2.0
-// * (the "License"); you may not use this file except in compliance with
-// * the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.connectors.flume;
-//
-//import org.apache.flink.streaming.api.datastream.DataStream;
-//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-//import org.apache.flink.streaming.util.serialization.SerializationSchema;
-//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
-//
-//public class FlumeTopology {
-//
-//	public static void main(String[] args) throws Exception {
-//
-//		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-//
-//		@SuppressWarnings("unused")
-//		DataStream<String> dataStream1 = env.addSource(
-//				new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
-//				new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
-//
-//		env.execute();
-//	}
-//
-//	public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
-//
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public byte[] serialize(String element) {
-//			return element.getBytes();
-//		}
-//	}
-//
-//}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
deleted file mode 100644
index 0f16541..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParseFlatMap.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.sling.commons.json.JSONException;
-
-/**
- * Abstract class derived from {@link RichFlatMapFunction} to handle JSON files.
- * 
- * @param <IN>
- *            Type of the input elements.
- * @param <OUT>
- *            Type of the returned elements.
- */
-public abstract class JSONParseFlatMap<IN, OUT> extends RichFlatMapFunction<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	// private static final Log LOG = LogFactory.getLog(JSONParseFlatMap.class);
-
-	/**
-	 * Get the value object associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public Object get(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).get("retValue");
-	}
-
-	/**
-	 * Get the boolean value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public boolean getBoolean(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getBoolean("retValue");
-	}
-
-	/**
-	 * Get the double value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public double getDouble(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getDouble("retValue");
-	}
-
-	/**
-	 * Get the int value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public int getInt(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getInt("retValue");
-	}
-
-	/**
-	 * Get the long value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public long getLong(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-
-		return parser.parse(field).getLong("retValue");
-	}
-	
-	/**
-	 * Get the String value associated with a key form a JSON code. It can find
-	 * embedded fields, too.
-	 * 
-	 * @param jsonText
-	 *            JSON String in which the field is searched.
-	 * @param field
-	 *            The key whose value is searched for.
-	 * @return The object associated with the field.
-	 * @throws JSONException
-	 *             If the field is not found.
-	 */
-	public String getString(String jsonText, String field) throws JSONException {
-		JSONParser parser = new JSONParser(jsonText);
-		
-		return parser.parse(field).getString("retValue");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
deleted file mode 100644
index c1eabbd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/json/JSONParser.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.json;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.StringTokenizer;
-
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-
-/**
- * A JSONParser contains a JSONObject and provides opportunity to access
- * embedded fields in JSON code.
- */
-public class JSONParser {
-
-	private JSONObject originalJO;
-	private String searchedfield;
-	private Object temp;
-
-	/**
-	 * Construct a JSONParser from a string. The string has to be a JSON code
-	 * from which we want to get a field.
-	 * 
-	 * @param jsonText
-	 *            A string which contains a JSON code. String representation of
-	 *            a JSON code.
-	 * @throws JSONException
-	 *             If there is a syntax error in the source string.
-	 */
-	public JSONParser(String jsonText) throws JSONException {
-		originalJO = new JSONObject(jsonText);
-	}
-
-	/**
-	 * 
-	 * Parse the JSON code passed to the constructor to find the given key.
-	 * 
-	 * @param key
-	 *            The key whose value is searched for.
-	 * @return A JSONObject which has only one field called "retValue" and the
-	 *         value associated to it is the searched value. The methods of
-	 *         JSONObject can be used to get the field value in a desired
-	 *         format.
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	public JSONObject parse(String key) throws JSONException {
-		initializeParser(key);
-		parsing();
-		return putResultInJSONObj();
-	}
-
-	/**
-	 * Prepare the fields of the class for the parsing
-	 * 
-	 * @param key
-	 *            The key whose value is searched for.
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	private void initializeParser(String key) throws JSONException {
-		searchedfield = key;
-		temp = new JSONObject(originalJO.toString());
-	}
-
-	/**
-	 * This function goes through the given field and calls the appropriate
-	 * functions to treat the units between the punctuation marks.
-	 * 
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	private void parsing() throws JSONException {
-		StringTokenizer st = new StringTokenizer(searchedfield, ".");
-		while (st.hasMoreTokens()) {
-			find(st.nextToken());
-		}
-	}
-
-	/**
-	 * Search for the next part of the field and update the state if it was
-	 * found.
-	 * 
-	 * @param nextToken
-	 *            The current part of the searched field.
-	 * @throws JSONException
-	 *             If the key is not found.
-	 */
-	private void find(String nextToken) throws JSONException {
-		if (endsWithBracket(nextToken)) {
-			treatAllBracket(nextToken);
-		} else {
-			temp = ((JSONObject) temp).get(nextToken);
-		}
-	}
-
-	/**
-	 * Determine whether the given string ends with a closing square bracket ']'
-	 * 
-	 * @param nextToken
-	 *            The current part of the searched field.
-	 * @return True if the given string ends with a closing square bracket ']'
-	 *         and false otherwise.
-	 */
-	private boolean endsWithBracket(String nextToken) {
-		return nextToken.substring(nextToken.length() - 1).endsWith("]");
-	}
-
-	/**
-	 * Handle (multidimensional) arrays. Treat the square bracket pairs one
-	 * after the other if necessary.
-	 * 
-	 * @param nextToken
-	 *            The current part of the searched field.
-	 * @throws JSONException
-	 *             If the searched element is not found.
-	 */
-	private void treatAllBracket(String nextToken) throws JSONException {
-		List<String> list = Arrays.asList(nextToken.split("\\["));
-		ListIterator<String> iter = list.listIterator();
-
-		temp = ((JSONObject) temp).get(iter.next());
-
-		while (iter.hasNext()) {
-			int index = Integer.parseInt(cutBracket(iter.next()));
-			temp = ((JSONArray) temp).get(index);
-		}
-	}
-
-	/**
-	 * Remove the last character of the string.
-	 * 
-	 * @param string
-	 *            String to modify.
-	 * @return The given string without the last character.
-	 */
-	private String cutBracket(String string) {
-		return string.substring(0, string.length() - 1);
-	}
-
-	/**
-	 * Save the result of the search into a JSONObject.
-	 * 
-	 * @return A special JSONObject which contain only one key. The value
-	 *         associated to this key is the result of the search.
-	 * @throws JSONException
-	 *             If there is a problem creating the JSONObject. (e.g. invalid
-	 *             syntax)
-	 */
-	private JSONObject putResultInJSONObj() throws JSONException {
-		JSONObject jo = new JSONObject();
-		jo.put("retValue", temp);
-		return jo;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
deleted file mode 100644
index fe6684d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-public class KafkaConsumerExample {
-
-	private static String host;
-	private static int port;
-	private static String topic;
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
-
-		DataStream<String> kafkaStream = env
-				.addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
-		kafkaStream.print();
-
-		env.execute();
-	}
-
-	private static boolean parseParameters(String[] args) {
-		if (args.length == 3) {
-			host = args[0];
-			port = Integer.parseInt(args[1]);
-			topic = args[2];
-			return true;
-		} else {
-			System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
-			return false;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
deleted file mode 100644
index 4dd5577..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
-
-public class KafkaProducerExample {
-
-	private static String host;
-	private static int port;
-	private static String topic;
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
-
-		@SuppressWarnings({ "unused", "serial" })
-		DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
-
-			private int index = 0;
-
-			@Override
-			public boolean reachedEnd() throws Exception {
-				return index >= 20;
-			}
-
-			@Override
-			public String next() throws Exception {
-				if (index < 20) {
-					String result = "message #" + index;
-					index++;
-					return result;
-				}
-
-				return "q";
-			}
-
-		}).addSink(
-				new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
-		)
-		.setParallelism(3);
-
-		env.execute();
-	}
-
-	private static boolean parseParameters(String[] args) {
-		if (args.length == 3) {
-			host = args[0];
-			port = Integer.parseInt(args[1]);
-			topic = args[2];
-			return true;
-		} else {
-			System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
-			return false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
deleted file mode 100644
index 4286196..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
-import org.apache.flink.runtime.util.DataOutputSerializer;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-
-import java.io.IOException;
-
-public class Utils {
-	public static class TypeInformationSerializationSchema<T>
-			implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
-		private final TypeSerializer<T> serializer;
-		private final TypeInformation<T> ti;
-
-		public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) {
-			this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type);
-			this.serializer = ti.createSerializer(ec);
-		}
-		@Override
-		public T deserialize(byte[] message) {
-			try {
-				return serializer.deserialize(new ByteArrayInputView(message));
-			} catch (IOException e) {
-				throw new RuntimeException("Unable to deserialize message", e);
-			}
-		}
-
-		@Override
-		public boolean isEndOfStream(T nextElement) {
-			return false;
-		}
-
-		@Override
-		public byte[] serialize(T element) {
-			DataOutputSerializer dos = new DataOutputSerializer(16);
-			try {
-				serializer.serialize(element, dos);
-			} catch (IOException e) {
-				throw new RuntimeException("Unable to serialize record", e);
-			}
-			return dos.getByteArray();
-		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return ti;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
deleted file mode 100644
index 0965b29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
-import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.util.NetUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.DefaultEncoder;
-
-
-/**
- * Sink that emits its inputs to a Kafka topic.
- *
- * @param <IN>
- * 		Type of the sink input
- */
-public class KafkaSink<IN> extends RichSinkFunction<IN> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
-
-	private Producer<IN, byte[]> producer;
-	private Properties userDefinedProperties;
-	private String topicId;
-	private String brokerList;
-	private SerializationSchema<IN, byte[]> schema;
-	private SerializableKafkaPartitioner partitioner;
-	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 *			Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema) {
-		this(brokerList, topicId, new Properties(), serializationSchema);
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic with custom Producer configuration.
-	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
-	 * configuration.
-	 *
-	 * @param brokerList
-	 * 		Addresses of the brokers
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param producerConfig
-	 * 		Configurations of the Kafka producer
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 */
-	public KafkaSink(String brokerList, String topicId, Properties producerConfig,
-			SerializationSchema<IN, byte[]> serializationSchema) {
-		String[] elements = brokerList.split(",");
-		for(String broker: elements) {
-			NetUtils.ensureCorrectHostnamePort(broker);
-		}
-		Preconditions.checkNotNull(topicId, "TopicID not set");
-
-		this.brokerList = brokerList;
-		this.topicId = topicId;
-		this.schema = serializationSchema;
-		this.partitionerClass = null;
-		this.userDefinedProperties = producerConfig;
-	}
-
-	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input to
-	 * the topic.
-	 *
-	 * @param brokerList
-	 * @param topicId
-	 * 		ID of the Kafka topic.
-	 * @param serializationSchema
-	 * 		User defined serialization schema.
-	 * @param partitioner
-	 * 		User defined partitioner.
-	 */
-	public KafkaSink(String brokerList, String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		ClosureCleaner.ensureSerializable(partitioner);
-		this.partitioner = partitioner;
-	}
-
-	public KafkaSink(String brokerList,
-			String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema,
-			Class<? extends SerializableKafkaPartitioner> partitioner) {
-		this(brokerList, topicId, serializationSchema);
-		this.partitionerClass = partitioner;
-	}
-
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	@Override
-	public void open(Configuration configuration) {
-
-		Properties properties = new Properties();
-
-		properties.put("metadata.broker.list", brokerList);
-		properties.put("request.required.acks", "-1");
-		properties.put("message.send.max.retries", "10");
-
-		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		// this will not be used as the key will not be serialized
-		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
-
-		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
-			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
-		}
-
-		if (partitioner != null) {
-			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
-			// java serialization will do the rest.
-			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
-		}
-		if (partitionerClass != null) {
-			properties.put("partitioner.class", partitionerClass);
-		}
-
-		ProducerConfig config = new ProducerConfig(properties);
-
-		try {
-			producer = new Producer<IN, byte[]>(config);
-		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
-		}
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Kafka.
-	 *
-	 * @param next
-	 * 		The incoming data
-	 */
-	@Override
-	public void invoke(IN next) {
-		byte[] serialized = schema.serialize(next);
-
-		// Sending message without serializable key.
-		producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
-	}
-
-	@Override
-	public void close() {
-		if (producer != null) {
-			producer.close();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
deleted file mode 100644
index 00666f6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.api;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import com.google.common.base.Preconditions;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Source that listens to a Kafka topic using the high level Kafka API.
- * 
- * @param <OUT>
- *            Type of the messages on the topic.
- */
-public class KafkaSource<OUT> extends ConnectorSource<OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
-
-	private final String zookeeperAddress;
-	private final String groupId;
-	private final String topicId;
-	private Properties customProperties;
-
-	private transient ConsumerConnector consumer;
-	private transient ConsumerIterator<byte[], byte[]> consumerIterator;
-
-	private long zookeeperSyncTimeMillis;
-	private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
-	private static final String DEFAULT_GROUP_ID = "flink-group";
-
-	// We must read this in reachedEnd() to check for the end. We keep it to return it in
-	// next()
-	private OUT nextElement;
-
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 *
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param groupId
-	 * 			   ID of the consumer group.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 *            Synchronization time with zookeeper.
-	 */
-	public KafkaSource(String zookeeperAddress,
-					String topicId, String groupId,
-					DeserializationSchema<OUT> deserializationSchema,
-					long zookeeperSyncTimeMillis) {
-		this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
-	}
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 * 
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param groupId
-	 * 			   ID of the consumer group.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 *            Synchronization time with zookeeper.
-	 * @param customProperties
-	 * 			  Custom properties for Kafka
-	 */
-	public KafkaSource(String zookeeperAddress,
-					String topicId, String groupId,
-					DeserializationSchema<OUT> deserializationSchema,
-					long zookeeperSyncTimeMillis, Properties customProperties) {
-		super(deserializationSchema);
-		Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
-		Preconditions.checkNotNull(topicId, "Topic ID is null");
-		Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
-		Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0, "The ZK sync time must be positive");
-
-		this.zookeeperAddress = zookeeperAddress;
-		this.groupId = groupId;
-		this.topicId = topicId;
-		this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
-		this.customProperties = customProperties;
-	}
-
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 *
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 * @param zookeeperSyncTimeMillis
-	 *            Synchronization time with zookeeper.
-	 */
-	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
-		this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
-	}
-	/**
-	 * Creates a KafkaSource that consumes a topic.
-	 *
-	 * @param zookeeperAddress
-	 *            Address of the Zookeeper host (with port number).
-	 * @param topicId
-	 *            ID of the Kafka topic.
-	 * @param deserializationSchema
-	 *            User defined deserialization schema.
-	 */
-	public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema) {
-		this(zookeeperAddress, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
-	}
-
-	/**
-	 * Initializes the connection to Kafka.
-	 */
-	private void initializeConnection() {
-		Properties props = new Properties();
-		props.put("zookeeper.connect", zookeeperAddress);
-		props.put("group.id", groupId);
-		props.put("zookeeper.session.timeout.ms", "10000");
-		props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
-		props.put("auto.commit.interval.ms", "1000");
-
-		if(customProperties != null) {
-			for(Map.Entry<Object, Object> e : props.entrySet()) {
-				if(props.contains(e.getKey())) {
-					LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
-				}
-				props.put(e.getKey(), e.getValue());
-			}
-		}
-
-		consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
-
-		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
-				.createMessageStreams(Collections.singletonMap(topicId, 1));
-		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId);
-		KafkaStream<byte[], byte[]> stream = streams.get(0);
-
-		consumer.commitOffsets();
-
-		consumerIterator = stream.iterator();
-	}
-
-	@Override
-	public void open(Configuration config) throws Exception {
-		super.open(config);
-		initializeConnection();
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-		if (consumer != null) {
-			consumer.shutdown();
-		}
-	}
-
-	@Override
-	public boolean reachedEnd() throws Exception {
-		if (nextElement != null) {
-			return false;
-		} else if (consumerIterator.hasNext()) {
-			OUT out = schema.deserialize(consumerIterator.next().message());
-			if (schema.isEndOfStream(out)) {
-				return true;
-			}
-			nextElement = out;
-		}
-		return false;
-	}
-
-	@Override
-	public OUT next() throws Exception {
-		if (!reachedEnd()) {
-			OUT result = nextElement;
-			nextElement = null;
-			return result;
-		} else {
-			throw new RuntimeException("Source exhausted");
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
deleted file mode 100644
index 7ae17df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.api.config;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Hacky wrapper to send an object instance through a Properties - map.
- *
- * This works as follows:
- * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
- *
- * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
- * This is set in the key-value (java.util.Properties) map.
- * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
- * This is a hack because the put() method is called on the underlying Hashmap.
- *
- * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
- *
- * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
- */
-public class PartitionerWrapper implements Partitioner {
-	public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
-
-	private Partitioner wrapped;
-	public PartitionerWrapper(VerifiableProperties properties) {
-		wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
-	}
-
-	@Override
-	public int partition(Object value, int numberOfPartitions) {
-		return wrapped.partition(value, numberOfPartitions);
-	}
-}