You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/03/10 15:00:08 UTC

[08/14] flink git commit: [FLINK-1638] [streaming] Added Kafka topic creator and custom offset consumer

[FLINK-1638] [streaming] Added Kafka topic creator and custom offset consumer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7485c23
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7485c23
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7485c23

Branch: refs/heads/master
Commit: e7485c235eb3ed79b5427f4cb1225185fb3408ae
Parents: 0ccb87c
Author: Gábor Hermann <re...@gmail.com>
Authored: Wed Mar 4 12:03:14 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Mar 10 14:58:49 2015 +0100

----------------------------------------------------------------------
 .../connectors/kafka/KafkaConsumerExample.java  |  4 +-
 .../api/simple/KafkaCustomOffsetSource.java     | 46 ++++++++++++++
 .../kafka/api/simple/KafkaTopicCreator.java     | 67 ++++++++++++++++++++
 .../kafka/api/simple/SimpleKafkaSource.java     | 12 +++-
 4 files changed, 127 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index 587d7b1..754b2b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaCustomOffsetSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource;
 import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
 
@@ -41,7 +42,8 @@ public class KafkaConsumerExample {
 		DataStream<String> stream1 = env
 				.addSource(
 //						new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
-						new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
+//						new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
+						new KafkaCustomOffsetSource<String>(topic, host, port, new JavaDefaultStringSchema()))
 				.setParallelism(3)
 				.print().setParallelism(3);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
new file mode 100644
index 0000000..d90ff7c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+
+public class KafkaCustomOffsetSource<OUT> extends SimpleKafkaSource<OUT> {
+
+	/**
+	 * Partition index is set automatically by instance id.
+	 *
+	 * @param topicId
+	 * @param host
+	 * @param port
+	 * @param deserializationSchema
+	 */
+	public KafkaCustomOffsetSource(String topicId, String host, int port, DeserializationSchema<OUT> deserializationSchema) {
+		super(topicId, host, port, deserializationSchema);
+	}
+
+	@Override
+	protected void setInitialOffset(Configuration config) {
+		iterator.initializeFromOffset(10);
+	}
+
+	@Override
+	protected void gotMessage(MessageWithOffset msg) {
+		System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage()));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
new file mode 100644
index 0000000..9e12492
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.api.simple;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import kafka.admin.AdminUtils;
+
+public class KafkaTopicCreator {
+
+	public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) {
+		createTopic(zookeeperServer, topicName, numOfPartitions, replicationFactor, new Properties(), 10000, 10000);
+	}
+
+	public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor, Properties topicProperties, int sessionTimeoutMs, int connectionTimeoutMs) {
+		ZkClient zkClient = new ZkClient(zookeeperServer, sessionTimeoutMs, connectionTimeoutMs,
+				new KafkaZKStringSerializer());
+
+		Properties topicConfig = new Properties();
+		AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
+	}
+
+	private static class KafkaZKStringSerializer implements ZkSerializer {
+
+		@Override
+		public byte[] serialize(Object data) throws ZkMarshallingError {
+			try {
+				return ((String) data).getBytes("UTF-8");
+			} catch (UnsupportedEncodingException e) {
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+			if (bytes == null) {
+				return null;
+			} else {
+				try {
+					return new String(bytes, "UTF-8");
+				} catch (UnsupportedEncodingException e) {
+					throw new RuntimeException(e);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7485c23/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
index db75571..a721dee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java
@@ -29,13 +29,14 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
 	private String topicId;
 	private final String host;
 	private final int port;
-	private KafkaConsumerIterator iterator;
+	protected KafkaConsumerIterator iterator;
 
 	/**
 	 * Partition index is set automatically by instance id.
 	 * @param topicId
 	 * @param host
 	 * @param port
+	 * @param deserializationSchema
 	 */
 	public SimpleKafkaSource(String topicId,
 							 String host, int port, DeserializationSchema<OUT> deserializationSchema) {
@@ -48,21 +49,30 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
 	private void initializeConnection() {
 		int partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
 		iterator = new KafkaConsumerIterator(host, port, topicId, 0, 100L);
+	}
+
+	protected void setInitialOffset(Configuration config) {
 		iterator.initializeFromCurrent();
 	}
 
+	protected void gotMessage(MessageWithOffset msg) {
+	}
+
 	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke(Collector<OUT> collector) throws Exception {
 		while (iterator.hasNext()) {
 			MessageWithOffset msg = iterator.nextWithOffset();
+			gotMessage(msg);
 			OUT out = schema.deserialize(msg.getMessage());
 			collector.collect(out);
 		}
 	}
 
+
 	@Override
 	public void open(Configuration config) {
 		initializeConnection();
+		setInitialOffset(config);
 	}
 }