You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/16 09:56:40 UTC
flink git commit: [FLINK-1753] Extend KafkaITCase with large messages
test + rework PersistentKafkaSource.
Repository: flink
Updated Branches:
refs/heads/master 4754a97b1 -> 354922bed
[FLINK-1753] Extend KafkaITCase with large messages test + rework PersistentKafkaSource.
This closes #603
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/354922be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/354922be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/354922be
Branch: refs/heads/master
Commit: 354922bed7ffe5482513ac53b5ef56bfbca8bd9a
Parents: 4754a97
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Apr 13 16:23:58 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Thu Apr 16 09:55:51 2015 +0200
----------------------------------------------------------------------
.../common/functions/util/FunctionUtils.java | 12 +-
.../examples/java/wordcount/WordCount.java | 2 +-
.../typeutils/runtime/ByteArrayInputView.java | 40 ++++++
.../java/typeutils/runtime/TestInputView.java | 40 ------
.../streaming/connectors/ConnectorSource.java | 4 +
.../flink/streaming/connectors/kafka/Utils.java | 71 ++++++++++
.../kafka/api/simple/PersistentKafkaSource.java | 97 ++++++++-----
.../KafkaMultiplePartitionsIterator.java | 14 +-
.../iterator/KafkaSinglePartitionIterator.java | 31 ++---
.../streaming/connectors/kafka/KafkaITCase.java | 135 ++++++++++++++++++-
.../functions/source/GenericSourceFunction.java | 2 +-
.../streaming/api/operators/StreamOperator.java | 1 -
.../serialization/DeserializationSchema.java | 4 +-
13 files changed, 334 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
index 2486074..62e07ac 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/FunctionUtils.java
@@ -27,28 +27,28 @@ import org.apache.flink.configuration.Configuration;
public class FunctionUtils {
- public static void openFunction (Function function, Configuration parameters) throws Exception{
+ public static void openFunction(Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
- richFunction.open (parameters);
+ richFunction.open(parameters);
}
}
- public static void closeFunction (Function function) throws Exception{
+ public static void closeFunction(Function function) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
- richFunction.close ();
+ richFunction.close();
}
}
- public static void setFunctionRuntimeContext (Function function, RuntimeContext context){
+ public static void setFunctionRuntimeContext(Function function, RuntimeContext context){
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.setRuntimeContext(context);
}
}
- public static RuntimeContext getFunctionRuntimeContext (Function function, RuntimeContext defaultContext){
+ public static RuntimeContext getFunctionRuntimeContext(Function function, RuntimeContext defaultContext){
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
return richFunction.getRuntimeContext();
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
index bfd5e85..7db7946 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java
@@ -63,7 +63,7 @@ public class WordCount {
// get input data
DataSet<String> text = getTextDataSet(env);
-
+
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
new file mode 100644
index 0000000..48d6a3d
--- /dev/null
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ByteArrayInputView.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.core.memory.DataInputView;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public class ByteArrayInputView extends DataInputStream implements DataInputView {
+
+ public ByteArrayInputView(byte[] data) {
+ super(new ByteArrayInputStream(data));
+ }
+
+ @Override
+ public void skipBytesToRead(int numBytes) throws IOException {
+ while (numBytes > 0) {
+ int skipped = skipBytes(numBytes);
+ numBytes -= skipped;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
deleted file mode 100644
index e5ef665..0000000
--- a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestInputView.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.core.memory.DataInputView;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-
-class TestInputView extends DataInputStream implements DataInputView {
-
- public TestInputView(byte[] data) {
- super(new ByteArrayInputStream(data));
- }
-
- @Override
- public void skipBytesToRead(int numBytes) throws IOException {
- while (numBytes > 0) {
- int skipped = skipBytes(numBytes);
- numBytes -= skipped;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index 1e645cd..caabb21 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.functions.source.GenericSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -35,6 +36,9 @@ public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OU
@Override
public TypeInformation<OUT> getType() {
+ if(schema instanceof ResultTypeQueryable) {
+ return ((ResultTypeQueryable<OUT>) schema).getProducedType();
+ }
return TypeExtractor.createTypeInfo(DeserializationSchema.class, schema.getClass(), 0,
null, null);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
new file mode 100644
index 0000000..a29ba4d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+
+import java.io.IOException;
+
+public class Utils {
+ public static class TypeInformationSerializationSchema<T>
+ implements DeserializationSchema<T>, SerializationSchema<T, byte[]>, ResultTypeQueryable<T> {
+ private final TypeSerializer<T> serializer;
+ private final TypeInformation<T> ti;
+
+ public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) {
+ this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type);
+ this.serializer = ti.createSerializer(ec);
+ }
+ @Override
+ public T deserialize(byte[] message) {
+ try {
+ return serializer.deserialize(new ByteArrayInputView(message));
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize message", e);
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return false;
+ }
+
+ @Override
+ public byte[] serialize(T element) {
+ DataOutputSerializer dos = new DataOutputSerializer(16);
+ try {
+ serializer.serialize(element, dos);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to serialize record", e);
+ }
+ return dos.getByteArray();
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return ti;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index cb89248..f9d9508 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -17,13 +17,18 @@
package org.apache.flink.streaming.connectors.kafka.api.simple;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
+import java.util.Properties;
import com.google.common.base.Preconditions;
-
+import kafka.consumer.ConsumerConfig;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator;
import org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaIdleConsumerIterator;
@@ -33,7 +38,6 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
@@ -50,25 +54,19 @@ import org.slf4j.LoggerFactory;
public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
-
private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
+ public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";
+
private final String topicId;
- private final String zookeeperServerAddress;
- private final int zookeeperSyncTimeMillis;
- private final int waitOnEmptyFetchMillis;
private final KafkaOffset startingOffset;
-
- private int connectTimeoutMs = 100000;
- private int bufferSize = 64 * 1024;
+ private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.
private transient KafkaConsumerIterator iterator;
private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
private transient Map<Integer, KafkaOffset> partitions;
- private volatile boolean isRunning = false;
-
/**
* Creates a persistent Kafka source that consumes a topic.
* If there is are no new messages on the topic, this consumer will wait
@@ -109,11 +107,14 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
this(zookeeperAddress, topicId, deserializationSchema, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis, Offset.FROM_CURRENT);
}
+
/**
* Creates a persistent Kafka source that consumes a topic.
* If there is are no new messages on the topic, this consumer will wait
* waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
*
+ * THIS CONSTRUCTOR IS DEPRECATED: USE the constructor with the ConsumerConfig.
+ *
* @param zookeeperAddress
* Address of the Zookeeper host (with port number).
* @param topicId
@@ -127,18 +128,44 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
* @param startOffsetType
* The offset to start from (beginning or current).
*/
- public PersistentKafkaSource(String zookeeperAddress, String topicId,
- DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis,
- int waitOnEmptyFetchMillis, Offset startOffsetType) {
+ @Deprecated
+ public PersistentKafkaSource(String zookeeperAddress, String topicId,DeserializationSchema<OUT> deserializationSchema, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis, Offset startOffsetType) {
+ this(topicId, deserializationSchema, startOffsetType, legacyParametersToConsumerConfig(zookeeperAddress, zookeeperSyncTimeMillis, waitOnEmptyFetchMillis));
+ }
+
+ private static ConsumerConfig legacyParametersToConsumerConfig(String zookeeperAddress, int zookeeperSyncTimeMillis, int waitOnEmptyFetchMillis) {
+ Properties props = new Properties();
+ props.setProperty("zookeeper.sync.time.ms", Integer.toString(zookeeperSyncTimeMillis));
+ props.setProperty(WAIT_ON_EMPTY_FETCH_KEY, Integer.toString(waitOnEmptyFetchMillis));
+ props.setProperty("zookeeper.connect", zookeeperAddress);
+ props.setProperty("group.id", "flink-persistent-kafka-source");
+ return new ConsumerConfig(props);
+ }
+
+ /**
+ * Creates a persistent Kafka source that consumes a topic.
+ * If there is are no new messages on the topic, this consumer will wait
+ * waitOnEmptyFetchMillis milliseconds before trying to fetch messages again.
+ *
+ * @param topicId
+ * ID of the Kafka topic.
+ * @param deserializationSchema
+ * User defined deserialization schema.
+ * @param startOffsetType
+ * The offset to start from (beginning or current).
+ * @param consumerConfig
+ * Additional configuration for the PersistentKafkaSource.
+ * NOTE: This source will only respect certain configuration values from the config!
+ */
+ public PersistentKafkaSource(String topicId, DeserializationSchema<OUT> deserializationSchema, Offset startOffsetType, ConsumerConfig consumerConfig) {
super(deserializationSchema);
- Preconditions.checkNotNull(zookeeperAddress, "The Zookeeper address can not be null");
Preconditions.checkNotNull(topicId, "The topic id can not be null");
Preconditions.checkNotNull(deserializationSchema, "The deserialization schema can not be null");
- Preconditions.checkArgument(zookeeperSyncTimeMillis > 0, "The sync time must be positive");
- Preconditions.checkArgument(waitOnEmptyFetchMillis > 0, "The wait time must be positive");
+ Preconditions.checkNotNull(consumerConfig, "ConsumerConfig can not be null");
+
+ this.consumerConfig = consumerConfig;
this.topicId = topicId;
- this.zookeeperServerAddress = zookeeperAddress;
switch (startOffsetType) {
case FROM_BEGINNING:
@@ -151,20 +178,17 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
this.startingOffset = new CurrentOffset();
break;
}
-
- this.zookeeperSyncTimeMillis = zookeeperSyncTimeMillis;
- this.waitOnEmptyFetchMillis = waitOnEmptyFetchMillis;
}
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws InterruptedException {
+ LOG.info("Starting PersistentKafkaSource");
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
int indexOfSubtask = context.getIndexOfThisSubtask();
int numberOfSubtasks = context.getNumberOfParallelSubtasks();
- KafkaTopicUtils kafkaTopicUtils =
- new KafkaTopicUtils(zookeeperServerAddress, zookeeperSyncTimeMillis, zookeeperSyncTimeMillis);
+ KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(consumerConfig.zkConnect(), consumerConfig.zkSyncTimeMs(), consumerConfig.zkConnectionTimeoutMs());
int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
@@ -187,10 +211,10 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
context.registerState("kafka", kafkaOffSet);
}
- iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, waitOnEmptyFetchMillis, connectTimeoutMs, bufferSize);
+ iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, this.consumerConfig);
if (LOG.isInfoEnabled()) {
- LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.",
+ LOG.info("PersistentKafkaSource ({}/{}) listening to partitions {} of topic {}.",
indexOfSubtask + 1, numberOfSubtasks, partitions.keySet(), topicId);
}
}
@@ -200,9 +224,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
@Override
public void run(Collector<OUT> collector) throws Exception {
- isRunning = true;
MessageWithMetadata msg;
- while (isRunning && iterator.hasNext()) {
+ while (iterator.hasNext()) {
msg = iterator.nextWithOffset();
OUT out = schema.deserialize(msg.getMessage());
@@ -218,17 +241,21 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
}
}
- public void setConnectTimeoutMs(int connectTimeoutMs) {
- Preconditions.checkArgument(connectTimeoutMs > 0, "The timeout must be positive");
- this.connectTimeoutMs = connectTimeoutMs;
+ @Override
+ public void cancel() {
+ LOG.info("PersistentKafkaSource has been cancelled");
}
- public void setBufferSize(int bufferSize) {
- Preconditions.checkArgument(connectTimeoutMs > 0, "The buffer size must be positive");
- this.bufferSize = bufferSize;
+ private void writeObject(ObjectOutputStream out)
+ throws IOException, ClassNotFoundException {
+ out.defaultWriteObject();
+ out.writeObject(consumerConfig.props().props());
}
- @Override
- public void cancel() {
+ private void readObject(ObjectInputStream in)
+ throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ Properties props = (Properties) in.readObject();
+ consumerConfig = new ConsumerConfig(props);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index b76421e..9da1bea 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -21,8 +21,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import kafka.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
+import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,15 +34,14 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
protected List<KafkaSinglePartitionIterator> partitions;
- protected final int waitOnEmptyFetch;
+ protected final ConsumerConfig consumerConfig;
public KafkaMultiplePartitionsIterator(String topic,
Map<Integer, KafkaOffset> partitionsWithOffset,
- KafkaTopicUtils kafkaTopicUtils,
- int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) {
+ KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
- this.waitOnEmptyFetch = waitOnEmptyFetch;
+ this.consumerConfig = consumerConfig;
for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
partitions.add(new KafkaSinglePartitionIterator(
@@ -48,8 +49,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
partitionWithOffset.getKey(),
partitionWithOffset.getValue(),
kafkaTopicUtils,
- connectTimeoutMs,
- bufferSize));
+ this.consumerConfig));
}
}
@@ -91,7 +91,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
// do not wait if a new message has been fetched
if (!gotNewMessage) {
try {
- Thread.sleep(waitOnEmptyFetch);
+ Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for new messages", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
index 6e326e5..0aaa771 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import kafka.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
@@ -64,13 +65,11 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
private List<String> replicaBrokers;
private String clientName;
private Broker leadBroker;
- private final int connectTimeoutMs;
- private final int bufferSize;
+ private final ConsumerConfig consumerConfig;
private KafkaOffset initialOffset;
private transient Iterator<MessageAndOffset> iter;
private transient FetchResponse fetchResponse;
- private volatile boolean isRunning;
/**
* Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
@@ -84,19 +83,14 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
* Offset to start consuming at
* @param kafkaTopicUtils
* Util for receiving topic metadata
- * @param connectTimeoutMs
- * Connection timeout in milliseconds
- * @param bufferSize
- * Size of buffer
*/
public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset,
- KafkaTopicUtils kafkaTopicUtils, int connectTimeoutMs, int bufferSize) {
+ KafkaTopicUtils kafkaTopicUtils, ConsumerConfig consumerConfig) {
Set<String> brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition);
this.hosts = new ArrayList<String>(brokerAddresses);
- this.connectTimeoutMs = connectTimeoutMs;
- this.bufferSize = bufferSize;
+ this.consumerConfig = consumerConfig;
this.topic = topic;
this.partition = partition;
@@ -119,7 +113,6 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
}
PartitionMetadata metadata;
- isRunning = true;
do {
metadata = findLeader(hosts, topic, partition);
try {
@@ -127,8 +120,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
} catch (InterruptedException e) {
throw new InterruptedException("Establishing connection to Kafka failed");
}
- } while (isRunning && metadata == null);
- isRunning = false;
+ } while (metadata == null);
if (metadata.leader() == null) {
throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
@@ -137,13 +129,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
leadBroker = metadata.leader();
clientName = "Client_" + topic + "_" + partition;
- consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), connectTimeoutMs, bufferSize, clientName);
+ consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
try {
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
} catch (NotLeaderForPartitionException e) {
do {
-
metadata = findLeader(hosts, topic, partition);
try {
@@ -254,7 +245,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
- .addFetch(topic, partition, offset, 100000).build();
+ .addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build();
fetchResponse = consumer.fetch(req);
@@ -283,10 +274,9 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, topic, partition);
- consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), 100000, 64 * 1024, clientName);
+ consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
}
- @SuppressWarnings("ConstantConditions")
private PartitionMetadata findLeader(List<String> addresses, String a_topic,
int a_partition) {
@@ -304,7 +294,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
SimpleConsumer consumer = null;
try {
- consumer = new SimpleConsumer(host, port, connectTimeoutMs, bufferSize, "leaderLookup");
+ consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
@@ -343,7 +333,6 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
return returnMetaData;
}
- @SuppressWarnings({"ConstantConditions", "UnusedAssignment"})
private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException {
for (int i = 0; i < 3; i++) {
if (LOG.isInfoEnabled()) {
@@ -370,7 +359,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
}
}
}
- throw new InterruptedException("Unable to find new leader after Broker failure.");
+ throw new RuntimeException("Unable to find new leader after Broker failure.");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 8205799..9ecd0a3 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -28,16 +28,21 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Properties;
+import java.util.Random;
+import kafka.consumer.ConsumerConfig;
import org.apache.commons.lang.SerializationUtils;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
@@ -158,7 +163,7 @@ public class KafkaITCase {
@Override
public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.debug("Got " + value);
+ LOG.debug("Got value = " + value);
String[] sp = value.f1.split("-");
int v = Integer.parseInt(sp[1]);
@@ -237,14 +242,14 @@ public class KafkaITCase {
// add consuming topology:
DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
new PersistentKafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, new TupleSerializationSchema(), 5000, 100, Offset.FROM_BEGINNING));
- consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
+ consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
int elCnt = 0;
int start = -1;
BitSet validator = new BitSet(101);
@Override
public void invoke(Tuple2<Long, String> value) throws Exception {
- LOG.debug("Got " + value);
+ LOG.info("Got value " + value);
String[] sp = value.f1.split("-");
int v = Integer.parseInt(sp[1]);
@@ -265,6 +270,12 @@ public class KafkaITCase {
throw new SuccessException();
}
}
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ Assert.assertTrue("No element received", elCnt > 0);
+ }
});
// add producing topology
@@ -278,6 +289,8 @@ public class KafkaITCase {
int cnt = 0;
while (running) {
collector.collect(new Tuple2<Long, String>(1000L + cnt, "kafka-" + cnt++));
+ LOG.info("Produced " + cnt);
+
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
@@ -311,6 +324,117 @@ public class KafkaITCase {
LOG.info("Finished KafkaITCase.tupleTestTopology()");
}
+ /**
+ * Test Flink's Kafka integration also with very big records (30MB)
+ *
+ * see http://stackoverflow.com/questions/21020347/kafka-sending-a-15mb-message
+ *
+ * @throws Exception
+ */
+ @Test
+ public void bigRecordTestTopology() throws Exception {
+
+ LOG.info("Starting KafkaITCase.bigRecordTestTopology()");
+
+ String topic = "bigRecordTestTopic";
+ createTestTopic(topic, 1, 1);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ // add consuming topology:
+ Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
+ consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
+ consumerProps.setProperty("group.id", "test");
+
+ ConsumerConfig cc = new ConsumerConfig(consumerProps);
+ DataStreamSource<Tuple2<Long, byte[]>> consuming = env.addSource(
+ new PersistentKafkaSource<Tuple2<Long, byte[]>>(topic, serSchema, Offset.FROM_BEGINNING, cc));
+
+ consuming.addSink(new SinkFunction<Tuple2<Long, byte[]>>() {
+ int elCnt = 0;
+
+ @Override
+ public void invoke(Tuple2<Long, byte[]> value) throws Exception {
+ elCnt++;
+ if(value.f0 == -1) {
+ // we should have seen 11 elements now.
+ if(elCnt == 11) {
+ throw new SuccessException();
+ } else {
+ throw new RuntimeException("There have been "+elCnt+" elements");
+ }
+ }
+ if(elCnt > 10) {
+ throw new RuntimeException("More than 10 elements seen: "+elCnt);
+ }
+ }
+ });
+
+ // add producing topology
+ DataStream<Tuple2<Long, byte[]>> stream = env.addSource(new RichSourceFunction<Tuple2<Long, byte[]>>() {
+ private static final long serialVersionUID = 1L;
+ boolean running = true;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+ @Override
+ public void run(Collector<Tuple2<Long, byte[]>> collector) throws Exception {
+ LOG.info("Starting source.");
+ long cnt = 0;
+ Random rnd = new Random(1337);
+ while (running) {
+ //
+ byte[] wl = new byte[Math.abs(rnd.nextInt(1024 * 1024 * 30))];
+ collector.collect(new Tuple2<Long, byte[]>(cnt++, wl));
+ LOG.info("Emitted cnt=" + (cnt - 1) + " with byte.length = " + wl.length);
+
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ if(cnt == 10) {
+ // signal end
+ collector.collect(new Tuple2<Long, byte[]>(-1L, new byte[]{1}));
+ running = false;
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ LOG.info("Source got cancel()");
+ running = false;
+ }
+ });
+
+ stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(zookeeperConnectionString, topic,
+ new Utils.TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig()))
+ );
+
+ try {
+ env.setParallelism(1);
+ env.execute();
+ } catch (JobExecutionException good) {
+ Throwable t = good.getCause();
+ int limit = 0;
+ while (!(t instanceof SuccessException)) {
+ t = t.getCause();
+ if (limit++ == 20) {
+ LOG.warn("Test failed with exception", good);
+ Assert.fail("Test failed with: " + good.getMessage());
+ }
+ }
+ }
+
+ LOG.info("Finished KafkaITCase.bigRecordTestTopology()");
+ }
+
+
private static boolean partitionerHasBeenCalled = false;
@Test
@@ -616,8 +740,7 @@ public class KafkaITCase {
// check if everything in the bitset is set to true
int nc;
if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) {
-// throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
- System.out.println("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
}
throw new SuccessException();
} else if (elCnt == numOfMessagesToReceive) {
@@ -698,6 +821,8 @@ public class KafkaITCase {
kafkaProperties.put("broker.id", Integer.toString(brokerId));
kafkaProperties.put("log.dir", tmpFolder.toString());
kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+ kafkaProperties.put("message.max.bytes", "" + (35 * 1024 * 1024));
+ kafkaProperties.put("replica.fetch.max.bytes", "" + (35 * 1024 * 1024));
KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
index 72a18d0..0113cfe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/GenericSourceFunction.java
@@ -21,5 +21,5 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
public interface GenericSourceFunction<T> {
- public TypeInformation<T> getType();
+ TypeInformation<T> getType();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 7ec0b0b..6d6c793 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -56,7 +56,6 @@ public abstract class StreamOperator<IN, OUT> implements Serializable {
protected TypeSerializer<IN> objectSerializer;
protected StreamRecord<IN> nextRecord;
protected IN nextObject;
- protected boolean isMutable;
public Collector<OUT> collector;
protected Function userFunction;
http://git-wip-us.apache.org/repos/asf/flink/blob/354922be/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 08ef461..faaa79b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -28,7 +28,7 @@ public interface DeserializationSchema<T> extends Serializable {
* The incoming message in a byte array
* @return The deserialized message in the required format.
*/
- public T deserialize(byte[] message);
+ T deserialize(byte[] message);
/**
* Method to decide whether the element signals the end of the stream. If
@@ -38,5 +38,5 @@ public interface DeserializationSchema<T> extends Serializable {
* The element to test for end signal
* @return The end signal, if true the stream shuts down
*/
- public boolean isEndOfStream(T nextElement);
+ boolean isEndOfStream(T nextElement);
}