You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/05/28 15:36:31 UTC
[5/7] flink git commit: [FLINK-1874] [streaming] Connectors separated
into individual projects
[FLINK-1874] [streaming] Connectors separated into individual projects
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/665bcec7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/665bcec7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/665bcec7
Branch: refs/heads/master
Commit: 665bcec779ef16e9ae2ba535631765d329f81392
Parents: 72828b5
Author: mbalassi <mb...@apache.org>
Authored: Sat May 23 10:50:58 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Thu May 28 14:44:22 2015 +0200
----------------------------------------------------------------------
docs/apis/streaming_guide.md | 30 +-
.../flink-connector-flume/pom.xml | 171 +++
.../streaming/connectors/flume/FlumeSink.java | 141 +++
.../streaming/connectors/flume/FlumeSource.java | 149 +++
.../connectors/flume/FlumeTopology.java | 49 +
.../flink-connector-kafka/pom.xml | 103 ++
.../connectors/kafka/KafkaConsumerExample.java | 58 +
.../connectors/kafka/KafkaProducerExample.java | 80 ++
.../flink/streaming/connectors/kafka/Utils.java | 70 ++
.../connectors/kafka/api/KafkaSink.java | 193 ++++
.../connectors/kafka/api/KafkaSource.java | 219 ++++
.../kafka/api/config/PartitionerWrapper.java | 49 +
.../api/persistent/PersistentKafkaSource.java | 338 ++++++
.../partitioner/KafkaConstantPartitioner.java | 33 +
.../SerializableKafkaPartitioner.java | 24 +
.../flink/streaming/kafka/KafkaITCase.java | 1023 ++++++++++++++++++
.../kafka/util/KafkaLocalSystemTime.java | 48 +
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-connector-rabbitmq/pom.xml | 59 +
.../streaming/connectors/rabbitmq/RMQSink.java | 111 ++
.../connectors/rabbitmq/RMQSource.java | 138 +++
.../connectors/rabbitmq/RMQTopology.java | 52 +
.../flink-connector-twitter/pom.xml | 95 ++
.../connectors/json/JSONParseFlatMap.java | 144 +++
.../streaming/connectors/json/JSONParser.java | 175 +++
.../connectors/twitter/TwitterSource.java | 322 ++++++
.../connectors/twitter/TwitterStreaming.java | 99 ++
.../connectors/twitter/TwitterTopology.java | 92 ++
.../connectors/json/JSONParserTest.java | 74 ++
.../connectors/json/JSONParserTest2.java | 95 ++
.../src/test/resources/log4j-test.properties | 27 +
.../src/test/resources/logback-test.xml | 30 +
.../flink-streaming-connectors/pom.xml | 215 +---
.../streaming/connectors/ConnectorSource.java | 38 -
.../streaming/connectors/flume/FlumeSink.java | 141 ---
.../streaming/connectors/flume/FlumeSource.java | 149 ---
.../connectors/flume/FlumeTopology.java | 49 -
.../connectors/json/JSONParseFlatMap.java | 144 ---
.../streaming/connectors/json/JSONParser.java | 175 ---
.../connectors/kafka/KafkaConsumerExample.java | 58 -
.../connectors/kafka/KafkaProducerExample.java | 80 --
.../flink/streaming/connectors/kafka/Utils.java | 70 --
.../connectors/kafka/api/KafkaSink.java | 193 ----
.../connectors/kafka/api/KafkaSource.java | 219 ----
.../kafka/api/config/PartitionerWrapper.java | 49 -
.../api/persistent/PersistentKafkaSource.java | 338 ------
.../partitioner/KafkaConstantPartitioner.java | 33 -
.../SerializableKafkaPartitioner.java | 24 -
.../streaming/connectors/rabbitmq/RMQSink.java | 111 --
.../connectors/rabbitmq/RMQSource.java | 138 ---
.../connectors/rabbitmq/RMQTopology.java | 52 -
.../connectors/twitter/TwitterSource.java | 322 ------
.../connectors/twitter/TwitterStreaming.java | 99 --
.../connectors/twitter/TwitterTopology.java | 92 --
.../connectors/json/JSONParserTest.java | 74 --
.../connectors/json/JSONParserTest2.java | 95 --
.../streaming/connectors/kafka/KafkaITCase.java | 1023 ------------------
.../kafka/util/KafkaLocalSystemTime.java | 48 -
.../src/test/resources/log4j-test.properties | 27 -
.../src/test/resources/logback-test.xml | 30 -
.../api/functions/source/ConnectorSource.java | 38 +
flink-staging/pom.xml | 12 +-
63 files changed, 4397 insertions(+), 4087 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index ab23695..6565fbf 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1213,7 +1213,15 @@ To run an application using one of these connectors usually additional third par
### Apache Kafka
-This connector provides access to data streams from [Apache Kafka](https://kafka.apache.org/).
+This connector provides access to data streams from [Apache Kafka](https://kafka.apache.org/). For using this connector add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-kafka-connector</artifactId>
+ <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
#### Installing Apache Kafka
* Follow the instructions from [Kafka's quickstart](https://kafka.apache.org/documentation.html#quickstart) to download the code and launch a server (launching a Zookeeper and a Kafka server is required every time before starting the application).
@@ -1433,7 +1441,15 @@ More on Flume can be found [here](http://flume.apache.org).
### RabbitMQ
-This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/).
+This connector provides access to data streams from [RabbitMQ](http://www.rabbitmq.com/).For using this connector add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-rabbitmq-connector</artifactId>
+ <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
##### Installing RabbitMQ
Follow the instructions from the [RabbitMQ download page](http://www.rabbitmq.com/download.html). After the installation the server automatically starts and the application connecting to RabbitMQ can be launched.
@@ -1497,7 +1513,15 @@ More about RabbitMQ can be found [here](http://www.rabbitmq.com/).
### Twitter Streaming API
-Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream.
+Twitter Streaming API provides opportunity to connect to the stream of tweets made available by Twitter. Flink Streaming comes with a built-in `TwitterSource` class for establishing a connection to this stream.For using this connector add the following dependency to your project:
+
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-twitter-connector</artifactId>
+ <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
#### Authentication
In order to connect to Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
new file mode 100644
index 0000000..9ed9777
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/pom.xml
@@ -0,0 +1,171 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-flume</artifactId>
+ <name>flink-connector-flume</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <flume-ng.version>1.5.0</flume-ng.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
+ <version>${flume-ng.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.thoughtworks.paranamer</groupId>
+ <artifactId>paranamer</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.tukaani</groupId>
+ <artifactId>xz</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.velocity</groupId>
+ <artifactId>velocity</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jetty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- Override artifactSet configuration to build fat-jar with all dependencies packed. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.3</version>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+ <!-- We include all dependencies that transitively depend on guava -->
+ <include>org.apache.flume:*</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
new file mode 100644
index 0000000..50f5770
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.flume;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.api.RpcClient;
+import org.apache.flume.api.RpcClientFactory;
+import org.apache.flume.event.EventBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FlumeSink<IN> extends RichSinkFunction<IN> {
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlumeSink.class);
+
+ private transient FlinkRpcClientFacade client;
+ boolean initDone = false;
+ String host;
+ int port;
+ SerializationSchema<IN, byte[]> schema;
+
+ public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
+ this.host = host;
+ this.port = port;
+ this.schema = schema;
+ }
+
+ /**
+ * Receives tuples from the Apache Flink {@link DataStream} and forwards
+ * them to Apache Flume.
+ *
+ * @param value
+ * The tuple arriving from the datastream
+ */
+ @Override
+ public void invoke(IN value) {
+
+ byte[] data = schema.serialize(value);
+ client.sendDataToFlume(data);
+
+ }
+
+ private class FlinkRpcClientFacade {
+ private RpcClient client;
+ private String hostname;
+ private int port;
+
+ /**
+ * Initializes the connection to Apache Flume.
+ *
+ * @param hostname
+ * The host
+ * @param port
+ * The port.
+ */
+ public void init(String hostname, int port) {
+ // Setup the RPC connection
+ this.hostname = hostname;
+ this.port = port;
+ int initCounter = 0;
+ while (true) {
+ if (initCounter >= 90) {
+ throw new RuntimeException("Cannot establish connection with" + port + " at "
+ + host);
+ }
+ try {
+ this.client = RpcClientFactory.getDefaultInstance(hostname, port);
+ } catch (FlumeException e) {
+ // Wait one second if the connection failed before the next
+ // try
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Interrupted while trying to connect {} at {}", port, host);
+ }
+ }
+ }
+ if (client != null) {
+ break;
+ }
+ initCounter++;
+ }
+ initDone = true;
+ }
+
+ /**
+ * Sends byte arrays as {@link Event} series to Apache Flume.
+ *
+ * @param data
+ * The byte array to send to Apache FLume
+ */
+ public void sendDataToFlume(byte[] data) {
+ Event event = EventBuilder.withBody(data);
+
+ try {
+ client.append(event);
+
+ } catch (EventDeliveryException e) {
+ // clean up and recreate the client
+ client.close();
+ client = null;
+ client = RpcClientFactory.getDefaultInstance(hostname, port);
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ client.client.close();
+ }
+
+ @Override
+ public void open(Configuration config) {
+ client = new FlinkRpcClientFacade();
+ client.init(host, port);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
new file mode 100644
index 0000000..8fecd0f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -0,0 +1,149 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import java.util.List;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+//import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+//import org.apache.flink.util.Collector;
+//import org.apache.flume.Context;
+//import org.apache.flume.channel.ChannelProcessor;
+//import org.apache.flume.source.AvroSource;
+//import org.apache.flume.source.avro.AvroFlumeEvent;
+//import org.apache.flume.source.avro.Status;
+//
+//public class FlumeSource<OUT> extends ConnectorSource<OUT> {
+// private static final long serialVersionUID = 1L;
+//
+// String host;
+// String port;
+// volatile boolean finished = false;
+//
+// private volatile boolean isRunning = false;
+//
+// FlumeSource(String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+// super(deserializationSchema);
+// this.host = host;
+// this.port = Integer.toString(port);
+// }
+//
+// public class MyAvroSource extends AvroSource {
+// Collector<OUT> output;
+//
+// /**
+// * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
+// * {@link DataStream}.
+// *
+// * @param avroEvent
+// * The event that should be sent to the dataStream
+// * @return A {@link Status}.OK message if sending the event was
+// * successful.
+// */
+// @Override
+// public Status append(AvroFlumeEvent avroEvent) {
+// collect(avroEvent);
+// return Status.OK;
+// }
+//
+// /**
+// * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
+// * {@link DataStream}.
+// *
+// * @param events
+// * The events that is sent to the dataStream
+// * @return A Status.OK message if sending the events was successful.
+// */
+// @Override
+// public Status appendBatch(List<AvroFlumeEvent> events) {
+// for (AvroFlumeEvent avroEvent : events) {
+// collect(avroEvent);
+// }
+//
+// return Status.OK;
+// }
+//
+// /**
+// * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
+// * {@link DataStream}.
+// *
+// * @param avroEvent
+// * The event that is sent to the dataStream
+// */
+// private void collect(AvroFlumeEvent avroEvent) {
+// byte[] b = avroEvent.getBody().array();
+// OUT out = FlumeSource.this.schema.deserialize(b);
+//
+// if (schema.isEndOfStream(out)) {
+// FlumeSource.this.finished = true;
+// this.stop();
+// FlumeSource.this.notifyAll();
+// } else {
+// output.collect(out);
+// }
+//
+// }
+//
+// }
+//
+// MyAvroSource avroSource;
+//
+// /**
+// * Configures the AvroSource. Also sets the output so the application can
+// * use it from outside of the invoke function.
+// *
+// * @param output
+// * The output used in the invoke function
+// */
+// public void configureAvroSource(Collector<OUT> output) {
+//
+// avroSource = new MyAvroSource();
+// avroSource.output = output;
+// Context context = new Context();
+// context.put("port", port);
+// context.put("bind", host);
+// avroSource.configure(context);
+// // An instance of a ChannelProcessor is required for configuring the
+// // avroSource although it will not be used in this case.
+// ChannelProcessor cp = new ChannelProcessor(null);
+// avroSource.setChannelProcessor(cp);
+// }
+//
+// /**
+// * Configures the AvroSource and runs until the user calls a close function.
+// *
+// * @param output
+// * The Collector for sending data to the datastream
+// */
+// @Override
+// public void run(Collector<OUT> output) throws Exception {
+// isRunning = true;
+// configureAvroSource(output);
+// avroSource.start();
+// while (!finished && isRunning) {
+// this.wait();
+// }
+// }
+//
+// @Override
+// public void cancel() {
+// isRunning = false;
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
new file mode 100644
index 0000000..f630bce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-flume/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -0,0 +1,49 @@
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements. See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.connectors.flume;
+//
+//import org.apache.flink.streaming.api.datastream.DataStream;
+//import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+//import org.apache.flink.streaming.util.serialization.SerializationSchema;
+//import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+//
+//public class FlumeTopology {
+//
+// public static void main(String[] args) throws Exception {
+//
+// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+//
+// @SuppressWarnings("unused")
+// DataStream<String> dataStream1 = env.addSource(
+// new FlumeSource<String>("localhost", 41414, new SimpleStringSchema())).addSink(
+// new FlumeSink<String>("localhost", 42424, new StringToByteSerializer()));
+//
+// env.execute();
+// }
+//
+// public static class StringToByteSerializer implements SerializationSchema<String, byte[]> {
+//
+// private static final long serialVersionUID = 1L;
+//
+// @Override
+// public byte[] serialize(String element) {
+// return element.getBytes();
+// }
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
new file mode 100644
index 0000000..e9c9da2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-connectors-parent</artifactId>
+ <version>0.9-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-connector-kafka</artifactId>
+ <name>flink-connector-kafka</name>
+
+ <packaging>jar</packaging>
+
+ <!-- Allow users to pass custom connector versions -->
+ <properties>
+ <kafka.version>0.8.2.0</kafka.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_${scala.binary.version}</artifactId>
+ <version>${kafka.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.sf.jopt-simple</groupId>
+ <artifactId>jopt-simple</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-annotation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.7.1</version>
+ </dependency>
+
+ </dependencies>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
new file mode 100644
index 0000000..fe6684d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+
+public class KafkaConsumerExample {
+
+ private static String host;
+ private static int port;
+ private static String topic;
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+
+ DataStream<String> kafkaStream = env
+ .addSource(new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()));
+ kafkaStream.print();
+
+ env.execute();
+ }
+
+ private static boolean parseParameters(String[] args) {
+ if (args.length == 3) {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ topic = args[2];
+ return true;
+ } else {
+ System.err.println("Usage: KafkaConsumerExample <host> <port> <topic>");
+ return false;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
new file mode 100644
index 0000000..4dd5577
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+
+public class KafkaProducerExample {
+
+ private static String host;
+ private static int port;
+ private static String topic;
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setParallelism(4);
+
+ @SuppressWarnings({ "unused", "serial" })
+ DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
+
+ private int index = 0;
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return index >= 20;
+ }
+
+ @Override
+ public String next() throws Exception {
+ if (index < 20) {
+ String result = "message #" + index;
+ index++;
+ return result;
+ }
+
+ return "q";
+ }
+
+ }).addSink(
+ new KafkaSink<String>(host + ":" + port, topic, new JavaDefaultStringSchema())
+ )
+ .setParallelism(3);
+
+ env.execute();
+ }
+
+ private static boolean parseParameters(String[] args) {
+ if (args.length == 3) {
+ host = args[0];
+ port = Integer.parseInt(args[1]);
+ topic = args[2];
+ return true;
+ } else {
+ System.err.println("Usage: KafkaProducerExample <host> <port> <topic>");
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
new file mode 100644
index 0000000..4286196
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.io.IOException;
+
+public class Utils {
+ public static class TypeInformationSerializationSchema<T>
+ implements DeserializationSchema<T>, SerializationSchema<T, byte[]> {
+ private final TypeSerializer<T> serializer;
+ private final TypeInformation<T> ti;
+
+ public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) {
+ this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type);
+ this.serializer = ti.createSerializer(ec);
+ }
+ @Override
+ public T deserialize(byte[] message) {
+ try {
+ return serializer.deserialize(new ByteArrayInputView(message));
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize message", e);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return false;
+ }
+
+ @Override
+ public byte[] serialize(T element) {
+ DataOutputSerializer dos = new DataOutputSerializer(16);
+ try {
+ serializer.serialize(element, dos);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to serialize record", e);
+ }
+ return dos.getByteArray();
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return ti;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
new file mode 100644
index 0000000..0965b29
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
+import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.DefaultEncoder;
+
+
+/**
+ * Sink that emits its inputs to a Kafka topic.
+ *
+ * @param <IN>
+ * Type of the sink input
+ */
+public class KafkaSink<IN> extends RichSinkFunction<IN> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+
+ private Producer<IN, byte[]> producer;
+ private Properties userDefinedProperties;
+ private String topicId;
+ private String brokerList;
+ private SerializationSchema<IN, byte[]> schema;
+ private SerializableKafkaPartitioner partitioner;
+ private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
+
+ /**
+ * Creates a KafkaSink for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param brokerList
+ * Addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema.
+ */
+ public KafkaSink(String brokerList, String topicId,
+ SerializationSchema<IN, byte[]> serializationSchema) {
+ this(brokerList, topicId, new Properties(), serializationSchema);
+ }
+
+ /**
+ * Creates a KafkaSink for a given topic with custom Producer configuration.
+ * If you use this constructor, the broker should be set with the "metadata.broker.list"
+ * configuration.
+ *
+ * @param brokerList
+ * Addresses of the brokers
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param producerConfig
+ * Configurations of the Kafka producer
+ * @param serializationSchema
+ * User defined serialization schema.
+ */
+ public KafkaSink(String brokerList, String topicId, Properties producerConfig,
+ SerializationSchema<IN, byte[]> serializationSchema) {
+ String[] elements = brokerList.split(",");
+ for(String broker: elements) {
+ NetUtils.ensureCorrectHostnamePort(broker);
+ }
+ Preconditions.checkNotNull(topicId, "TopicID not set");
+
+ this.brokerList = brokerList;
+ this.topicId = topicId;
+ this.schema = serializationSchema;
+ this.partitionerClass = null;
+ this.userDefinedProperties = producerConfig;
+ }
+
+ /**
+ * Creates a KafkaSink for a given topic. The sink produces its input to
+ * the topic.
+ *
+ * @param brokerList
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param serializationSchema
+ * User defined serialization schema.
+ * @param partitioner
+ * User defined partitioner.
+ */
+ public KafkaSink(String brokerList, String topicId,
+ SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
+ this(brokerList, topicId, serializationSchema);
+ ClosureCleaner.ensureSerializable(partitioner);
+ this.partitioner = partitioner;
+ }
+
+ public KafkaSink(String brokerList,
+ String topicId,
+ SerializationSchema<IN, byte[]> serializationSchema,
+ Class<? extends SerializableKafkaPartitioner> partitioner) {
+ this(brokerList, topicId, serializationSchema);
+ this.partitionerClass = partitioner;
+ }
+
+ /**
+ * Initializes the connection to Kafka.
+ */
+ @Override
+ public void open(Configuration configuration) {
+
+ Properties properties = new Properties();
+
+ properties.put("metadata.broker.list", brokerList);
+ properties.put("request.required.acks", "-1");
+ properties.put("message.send.max.retries", "10");
+
+ properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
+
+ // this will not be used as the key will not be serialized
+ properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
+
+ for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
+ properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
+ }
+
+ if (partitioner != null) {
+ properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
+ // java serialization will do the rest.
+ properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
+ }
+ if (partitionerClass != null) {
+ properties.put("partitioner.class", partitionerClass);
+ }
+
+ ProducerConfig config = new ProducerConfig(properties);
+
+ try {
+ producer = new Producer<IN, byte[]>(config);
+ } catch (NullPointerException e) {
+ throw new RuntimeException("Cannot connect to Kafka broker " + brokerList, e);
+ }
+ }
+
+ /**
+ * Called when new data arrives to the sink, and forwards it to Kafka.
+ *
+ * @param next
+ * The incoming data
+ */
+ @Override
+ public void invoke(IN next) {
+ byte[] serialized = schema.serialize(next);
+
+ // Sending message without serializable key.
+ producer.send(new KeyedMessage<IN, byte[]>(topicId, null, next, serialized));
+ }
+
+ @Override
+ public void close() {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
new file mode 100644
index 0000000..4a7ec15
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Preconditions;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.ConnectorSource;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Source that listens to a Kafka topic using the high level Kafka API.
+ *
+ * @param <OUT>
+ * Type of the messages on the topic.
+ */
+public class KafkaSource<OUT> extends ConnectorSource<OUT> {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+ private final String zookeeperAddress;
+ private final String groupId;
+ private final String topicId;
+ private Properties customProperties;
+
+ private transient ConsumerConnector consumer;
+ private transient ConsumerIterator<byte[], byte[]> consumerIterator;
+
+ private long zookeeperSyncTimeMillis;
+ private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
+ private static final String DEFAULT_GROUP_ID = "flink-group";
+
+ // We must read this in reachedEnd() to check for the end. We keep it to return it in
+ // next()
+ private OUT nextElement;
+
+ /**
+ * Creates a KafkaSource that consumes a topic.
+ *
+ * @param zookeeperAddress
+ * Address of the Zookeeper host (with port number).
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param groupId
+ * ID of the consumer group.
+ * @param deserializationSchema
+ * User defined deserialization schema.
+ * @param zookeeperSyncTimeMillis
+ * Synchronization time with zookeeper.
+ */
+ public KafkaSource(String zookeeperAddress,
+ String topicId, String groupId,
+ DeserializationSchema<OUT> deserializationSchema,
+ long zookeeperSyncTimeMillis) {
+ this(zookeeperAddress, topicId, groupId, deserializationSchema, zookeeperSyncTimeMillis, null);
+ }
+ /**
+ * Creates a KafkaSource that consumes a topic.
+ *
+ * @param zookeeperAddress
+ * Address of the Zookeeper host (with port number).
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param groupId
+ * ID of the consumer group.
+ * @param deserializationSchema
+ * User defined deserialization schema.
+ * @param zookeeperSyncTimeMillis
+ * Synchronization time with zookeeper.
+ * @param customProperties
+ * Custom properties for Kafka
+ */
+ public KafkaSource(String zookeeperAddress,
+ String topicId, String groupId,
+ DeserializationSchema<OUT> deserializationSchema,
+ long zookeeperSyncTimeMillis, Properties customProperties) {
+ super(deserializationSchema);
+ Preconditions.checkNotNull(zookeeperAddress, "ZK address is null");
+ Preconditions.checkNotNull(topicId, "Topic ID is null");
+ Preconditions.checkNotNull(deserializationSchema, "deserializationSchema is null");
+ Preconditions.checkArgument(zookeeperSyncTimeMillis >= 0, "The ZK sync time must be positive");
+
+ this.zookeeperAddress = zookeeperAddress;
+ this.groupId = groupId;
+ this.topicId = topicId;
+ this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
+ this.customProperties = customProperties;
+ }
+
+ /**
+ * Creates a KafkaSource that consumes a topic.
+ *
+ * @param zookeeperAddress
+ * Address of the Zookeeper host (with port number).
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param deserializationSchema
+ * User defined deserialization schema.
+ * @param zookeeperSyncTimeMillis
+ * Synchronization time with zookeeper.
+ */
+ public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
+ this(zookeeperAddress, topicId, DEFAULT_GROUP_ID, deserializationSchema, zookeeperSyncTimeMillis, null);
+ }
+ /**
+ * Creates a KafkaSource that consumes a topic.
+ *
+ * @param zookeeperAddress
+ * Address of the Zookeeper host (with port number).
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param deserializationSchema
+ * User defined deserialization schema.
+ */
+ public KafkaSource(String zookeeperAddress, String topicId, DeserializationSchema<OUT> deserializationSchema) {
+ this(zookeeperAddress, topicId, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
+ }
+
+ /**
+ * Initializes the connection to Kafka.
+ */
+ private void initializeConnection() {
+ Properties props = new Properties();
+ props.put("zookeeper.connect", zookeeperAddress);
+ props.put("group.id", groupId);
+ props.put("zookeeper.session.timeout.ms", "10000");
+ props.put("zookeeper.sync.time.ms", Long.toString(zookeeperSyncTimeMillis));
+ props.put("auto.commit.interval.ms", "1000");
+
+ if(customProperties != null) {
+ for(Map.Entry<Object, Object> e : props.entrySet()) {
+ if(props.contains(e.getKey())) {
+ LOG.warn("Overwriting property "+e.getKey()+" with value "+e.getValue());
+ }
+ props.put(e.getKey(), e.getValue());
+ }
+ }
+
+ consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
+
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
+ .createMessageStreams(Collections.singletonMap(topicId, 1));
+ List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId);
+ KafkaStream<byte[], byte[]> stream = streams.get(0);
+
+ consumer.commitOffsets();
+
+ consumerIterator = stream.iterator();
+ }
+
+ @Override
+ public void open(Configuration config) throws Exception {
+ super.open(config);
+ initializeConnection();
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (consumer != null) {
+ consumer.shutdown();
+ }
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ if (nextElement != null) {
+ return false;
+ } else if (consumerIterator.hasNext()) {
+ OUT out = schema.deserialize(consumerIterator.next().message());
+ if (schema.isEndOfStream(out)) {
+ return true;
+ }
+ nextElement = out;
+ }
+ return false;
+ }
+
+ @Override
+ public OUT next() throws Exception {
+ if (!reachedEnd()) {
+ OUT result = nextElement;
+ nextElement = null;
+ return result;
+ } else {
+ throw new RuntimeException("Source exhausted");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
new file mode 100644
index 0000000..7ae17df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/config/PartitionerWrapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.api.config;
+
+import kafka.producer.Partitioner;
+import kafka.utils.VerifiableProperties;
+
+/**
+ * Hacky wrapper to send an object instance through a Properties - map.
+ *
+ * This works as follows:
+ * The recommended way of creating a KafkaSink is specifying a classname for the partitioner.
+ *
+ * Otherwise (if the user gave a (serializable) class instance), we give Kafka the PartitionerWrapper class of Flink.
+ * This is set in the key-value (java.util.Properties) map.
+ * In addition to that, we use the Properties.put(Object, Object) to store the instance of the (serializable).
+ * This is a hack because the put() method is called on the underlying Hashmap.
+ *
+ * This PartitionerWrapper is called with the Properties. From there, we extract the wrapped Partitioner instance.
+ *
+ * The serializable PartitionerWrapper is serialized into the Properties Hashmap and also deserialized from there.
+ */
+public class PartitionerWrapper implements Partitioner {
+ public final static String SERIALIZED_WRAPPER_NAME = "flink.kafka.wrapper.serialized";
+
+ private Partitioner wrapped;
+ public PartitionerWrapper(VerifiableProperties properties) {
+ wrapped = (Partitioner) properties.props().get(SERIALIZED_WRAPPER_NAME);
+ }
+
+ @Override
+ public int partition(Object value, int numberOfPartitions) {
+ return wrapped.partition(value, numberOfPartitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
new file mode 100644
index 0000000..032ed08
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.persistent;
+
+import com.google.common.base.Preconditions;
+import kafka.common.TopicAndPartition;
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Source for reading from Kafka using Flink Streaming Fault Tolerance.
+ * This source is updating the committed offset in Zookeeper based on the internal checkpointing of Flink.
+ *
+ * Note that the autocommit feature of Kafka needs to be disabled for using this source.
+ */
+public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements
+ ResultTypeQueryable<OUT>,
+ CheckpointCommitter,
+ CheckpointedAsynchronously<long[]> {
+ private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
+
+ protected transient ConsumerConfig consumerConfig;
+ private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
+ private transient ConsumerConnector consumer;
+
+ private String topicName;
+ private DeserializationSchema<OUT> deserializationSchema;
+ private boolean running = true;
+
+ private transient long[] lastOffsets;
+ private transient ZkClient zkClient;
+ private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again.
+
+ // We set this in reachedEnd to carry it over to next()
+ private OUT nextElement = null;
+
+ /**
+ *
+ * For the @param consumerConfig, specify at least the "groupid" and "zookeeper.connect" string.
+ * The config will be passed into the Kafka High Level Consumer.
+ * For a full list of possible values, check this out: https://kafka.apache.org/documentation.html#consumerconfigs
+ */
+ public PersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) {
+ Preconditions.checkNotNull(topicName);
+ Preconditions.checkNotNull(deserializationSchema);
+ Preconditions.checkNotNull(consumerConfig);
+
+ this.topicName = topicName;
+ this.deserializationSchema = deserializationSchema;
+ this.consumerConfig = consumerConfig;
+ if(consumerConfig.autoCommitEnable()) {
+ throw new IllegalArgumentException("'auto.commit.enable' is set to 'true'. " +
+ "This source can only be used with auto commit disabled because the " +
+ "source is committing to zookeeper by itself (not using the KafkaConsumer).");
+ }
+ if(!consumerConfig.offsetsStorage().equals("zookeeper")) {
+ // we can currently only commit to ZK.
+ throw new IllegalArgumentException("The 'offsets.storage' has to be set to 'zookeeper' for this Source to work reliably");
+ }
+ }
+
+ // ---------------------- ParallelSourceFunction Lifecycle -----------------
+
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ ConsumerConnector consumer = Consumer.createJavaConsumerConnector(this.consumerConfig);
+ // we request only one stream per consumer instance. Kafka will make sure that each consumer group
+ // will see each message only once.
+ Map<String,Integer> topicCountMap = Collections.singletonMap(topicName, 1);
+ Map<String, List<KafkaStream<byte[], byte[]>>> streams = consumer.createMessageStreams(topicCountMap);
+ if(streams.size() != 1) {
+ throw new RuntimeException("Expected only one message stream but got "+streams.size());
+ }
+ List<KafkaStream<byte[], byte[]>> kafkaStreams = streams.get(topicName);
+ if(kafkaStreams == null) {
+ throw new RuntimeException("Requested stream not available. Available streams: "+streams.toString());
+ }
+ if(kafkaStreams.size() != 1) {
+ throw new RuntimeException("Requested 1 stream from Kafka, bot got "+kafkaStreams.size()+" streams");
+ }
+ LOG.info("Opening Consumer instance for topic '{}' on group '{}'", topicName, consumerConfig.groupId());
+ this.iteratorToRead = kafkaStreams.get(0).iterator();
+ this.consumer = consumer;
+
+ zkClient = new ZkClient(consumerConfig.zkConnect(),
+ consumerConfig.zkSessionTimeoutMs(),
+ consumerConfig.zkConnectionTimeoutMs(),
+ new KafkaZKStringSerializer());
+
+ // most likely the number of offsets we're going to store here will be lower than the number of partitions.
+ int numPartitions = getNumberOfPartitions();
+ LOG.debug("The topic {} has {} partitions", topicName, numPartitions);
+ this.lastOffsets = new long[numPartitions];
+ this.commitedOffsets = new long[numPartitions];
+ Arrays.fill(this.lastOffsets, -1);
+ Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+
+ nextElement = null;
+ }
+
+ @Override
+ public boolean reachedEnd() throws Exception {
+ if (nextElement != null) {
+ return false;
+ }
+
+ while (iteratorToRead.hasNext()) {
+ MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next();
+ if(lastOffsets[message.partition()] >= message.offset()) {
+ LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition());
+ continue;
+ }
+ lastOffsets[message.partition()] = message.offset();
+
+ OUT out = deserializationSchema.deserialize(message.message());
+ if (deserializationSchema.isEndOfStream(out)) {
+ LOG.info("DeserializationSchema signaled end of stream for this source");
+ break;
+ }
+
+ nextElement = out;
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Processed record with offset {} from partition {}", message.offset(), message.partition());
+ }
+ break;
+ }
+
+ return nextElement == null;
+ }
+
+ @Override
+ public OUT next() throws Exception {
+ if (!reachedEnd()) {
+ OUT result = nextElement;
+ nextElement = null;
+ return result;
+ } else {
+ throw new RuntimeException("Source exhausted");
+ }
+ }
+
+ @Override
+ public void close() {
+ LOG.info("Closing Kafka consumer");
+ this.consumer.shutdown();
+ zkClient.close();
+ }
+
+
+ // ---------------------- State / Checkpoint handling -----------------
+ // this source is keeping the partition offsets in Zookeeper
+
+ private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, long[]>();
+
+ @Override
+ public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+ if(lastOffsets == null) {
+ LOG.warn("State snapshot requested on not yet opened source. Returning null");
+ return null;
+ }
+ LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp);
+
+ long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length);
+ pendingCheckpoints.put(checkpointId, currentOffsets);
+ return currentOffsets;
+ }
+
+ @Override
+ public void restoreState(long[] state) {
+ // we maintain the offsets in Kafka, so nothing to do.
+ }
+
+
+ /**
+ * Notification on completed checkpoints
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ */
+ @Override
+ public void commitCheckpoint(long checkpointId) {
+ LOG.info("Commit checkpoint {}", checkpointId);
+ long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId);
+ if(checkpointOffsets == null) {
+ LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
+ return;
+ }
+ LOG.info("Got corresponding offsets {}", Arrays.toString(checkpointOffsets));
+
+ for(int partition = 0; partition < checkpointOffsets.length; partition++) {
+ long offset = checkpointOffsets[partition];
+ if(offset != -1) {
+ setOffset(partition, offset);
+ }
+ }
+ }
+
+ // --------------------- Zookeeper / Offset handling -----------------------------
+
+ private int getNumberOfPartitions() {
+ scala.collection.immutable.List<String> scalaSeq = JavaConversions.asScalaBuffer(Collections.singletonList(topicName)).toList();
+ scala.collection.mutable.Map<String, Seq<Object>> list = ZkUtils.getPartitionsForTopics(zkClient, scalaSeq);
+ Option<Seq<Object>> topicOption = list.get(topicName);
+ if(topicOption.isEmpty()) {
+ throw new IllegalArgumentException("Unable to get number of partitions for topic "+topicName+" from "+list.toString());
+ }
+ Seq<Object> topic = topicOption.get();
+ return topic.size();
+ }
+
+ protected void setOffset(int partition, long offset) {
+ if(commitedOffsets[partition] < offset) {
+ setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset);
+ commitedOffsets[partition] = offset;
+ } else {
+ LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition);
+ }
+ }
+
+
+
+ // the following two methods are static to allow access from the outside as well (Testcases)
+
+ /**
+ * This method's code is based on ZookeeperConsumerConnector.commitOffsetToZooKeeper()
+ */
+ public static void setOffset(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
+ LOG.info("Setting offset for partition {} of topic {} in group {} to offset {}", partition, topic, groupId, offset);
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+ ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition(), Long.toString(offset));
+ }
+
+ public static long getOffset(ZkClient zkClient, String groupId, String topic, int partition) {
+ TopicAndPartition tap = new TopicAndPartition(topic, partition);
+ ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
+ scala.Tuple2<String, Stat> data = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir() + "/" + tap.partition());
+ return Long.valueOf(data._1());
+ }
+
+
+ // ---------------------- (Java)Serialization methods for the consumerConfig -----------------
+
+ private void writeObject(ObjectOutputStream out)
+ throws IOException, ClassNotFoundException {
+ out.defaultWriteObject();
+ out.writeObject(consumerConfig.props().props());
+ }
+
+ private void readObject(ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ Properties props = (Properties) in.readObject();
+ consumerConfig = new ConsumerConfig(props);
+ }
+
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deserializationSchema.getProducedType();
+ }
+
+
+ // ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there) -----------------
+
+ public static class KafkaZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError {
+ try {
+ return ((String) data).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ if (bytes == null) {
+ return null;
+ } else {
+ try {
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
new file mode 100644
index 0000000..661d0bd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaConstantPartitioner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+public class KafkaConstantPartitioner implements SerializableKafkaPartitioner {
+
+ private static final long serialVersionUID = 1L;
+ private int partition;
+
+ public KafkaConstantPartitioner(int partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public int partition(Object value, int numberOfPartitions) {
+ return partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665bcec7/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
new file mode 100644
index 0000000..77a774e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/SerializableKafkaPartitioner.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import kafka.producer.Partitioner;
+import java.io.Serializable;
+
+public interface SerializableKafkaPartitioner extends Serializable, Partitioner {
+
+}