You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:08 UTC
[08/14] flink git commit: [FLINK-1638] [streaming] Added Kafka topic
creator and custom offset consumer
[FLINK-1638] [streaming] Added Kafka topic creator and custom offset consumer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7485c23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7485c23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7485c23
Branch: refs/heads/master
Commit: e7485c235eb3ed79b5427f4cb1225185fb3408ae
Parents: 0ccb87c
Author: Gábor Hermann <re...@gmail.com>
Authored: Wed Mar 4 12:03:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100
----------------------------------------------------------------------
.../connectors/kafka/KafkaConsumerExample.java | 4 +-
.../api/simple/KafkaCustomOffsetSource.java | 46 ++++++++++++++
.../kafka/api/simple/KafkaTopicCreator.java | 67 ++++++++++++++++++++
.../kafka/api/simple/SimpleKafkaSource.java | 12 +++-
4 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 587d7b1..754b2b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaCustomOffsetSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource;
import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
@@ -41,7 +42,8 @@ public class KafkaConsumerExample {
DataStream<String> stream1 = env
.addSource(
// new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
- new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
+// new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
+ new KafkaCustomOffsetSource<String>(topic, host, port, new JavaDefaultStringSchema()))
.setParallelism(3)
.print().setParallelism(3);
http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
new file mode 100644
index 0000000..d90ff7c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+
+public class KafkaCustomOffsetSource<OUT> extends SimpleKafkaSource<OUT> {
+
+ /**
+ * Partition index is set automatically by instance id.
+ *
+ * @param topicId
+ * @param host
+ * @param port
+ * @param deserializationSchema
+ */
+ public KafkaCustomOffsetSource(String topicId, String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+ super(topicId, host, port, deserializationSchema);
+ }
+
+ @Override
+ protected void setInitialOffset(Configuration config) {
+ iterator.initializeFromOffset(10);
+ }
+
+ @Override
+ protected void gotMessage(MessageWithOffset msg) {
+ System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
new file mode 100644
index 0000000..9e12492
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import kafka.admin.AdminUtils;
+
+public class KafkaTopicCreator {
+
+ public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) {
+ createTopic(zookeeperServer, topicName, numOfPartitions, replicationFactor, new Properties(), 10000, 10000);
+ }
+
+ public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor, Properties topicProperties, int sessionTimeoutMs, int connectionTimeoutMs) {
+ ZkClient zkClient = new ZkClient(zookeeperServer, sessionTimeoutMs, connectionTimeoutMs,
+ new KafkaZKStringSerializer());
+
+ Properties topicConfig = new Properties();
+ AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
+ }
+
+ private static class KafkaZKStringSerializer implements ZkSerializer {
+
+ @Override
+ public byte[] serialize(Object data) throws ZkMarshallingError {
+ try {
+ return ((String) data).getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+ if (bytes == null) {
+ return null;
+ } else {
+ try {
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
index db75571..a721dee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -29,13 +29,14 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
private String topicId;
private final String host;
private final int port;
- private KafkaConsumerIterator iterator;
+ protected KafkaConsumerIterator iterator;
/**
* Partition index is set automatically by instance id.
* @param topicId
* @param host
* @param port
+ * @param deserializationSchema
*/
public SimpleKafkaSource(String topicId,
String host, int port, DeserializationSchema<OUT> deserializationSchema) {
@@ -48,21 +49,30 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
private void initializeConnection() {
int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L);
+ }
+
+ protected void setInitialOffset(Configuration config) {
iterator.initializeFromCurrent();
}
+ protected void gotMessage(MessageWithOffset msg) {
+ }
+
@SuppressWarnings("unchecked")
@Override
public void invoke(Collector<OUT> collector) throws Exception {
while (iterator.hasNext()) {
MessageWithOffset msg = iterator.nextWithOffset();
+ gotMessage(msg);
OUT out = schema.deserialize(msg.getMessage());
collector.collect(out);
}
}
+
@Override
public void open(Configuration config) {
initializeConnection();
+ setInitialOffset(config);
}
}