You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/17 21:03:23 UTC
[03/21] flink git commit: [FLINK-2863] [kafka connector] Kafka
connector propagates async producer exceptions
[FLINK-2863] [kafka connector] Kafka connector propagates async producer exceptions
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8eeb3bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8eeb3bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8eeb3bb
Branch: refs/heads/master
Commit: a8eeb3bb1314d6c6cc19001181539d74bc25f419
Parents: 728df39
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 16 18:46:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Oct 17 18:45:02 2015 +0200
----------------------------------------------------------------------
.../connectors/kafka/FlinkKafkaProducer.java | 115 ++++++++++++++-----
.../connectors/kafka/KafkaProducerTest.java | 114 ++++++++++++++++++
2 files changed, 203 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a8eeb3bb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3d666ee..715f5ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.kafka;
import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -25,19 +27,20 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.List;
+import java.util.Properties;
/**
@@ -67,26 +70,34 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
/**
* The name of the topic this producer is writing data to
*/
- private String topicId;
+ private final String topicId;
/**
* (Serializable) SerializationSchema for turning objects used with Flink into
* byte[] for Kafka.
*/
- private SerializationSchema<IN, byte[]> schema;
+ private final SerializationSchema<IN, byte[]> schema;
/**
* User-provided partitioner for assigning an object to a Kafka partition.
*/
- private KafkaPartitioner partitioner;
-
- // -------------------------------- Runtime fields ------------------------------------------
+ private final KafkaPartitioner partitioner;
/**
- * KafkaProducer instance.
+ * Flag indicating whether to accept failures (and log them), or to fail on failures
*/
+ private boolean logFailuresOnly;
+
+ // -------------------------------- Runtime fields ------------------------------------------
+
+ /** KafkaProducer instance */
private transient KafkaProducer<byte[], byte[]> producer;
+ /** The callback than handles error propagation or logging callbacks */
+ private transient Callback callback;
+
+ /** Errors encountered in the async producer are stored here */
+ private transient volatile Exception asyncException;
/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
@@ -154,8 +165,7 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
// create a local KafkaProducer to get the list of partitions.
// this will also ensure locally that all required ProducerConfig values are set.
- {
- KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig);
+ try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) {
List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
this.partitions = new int[partitionsList.size()];
@@ -165,24 +175,63 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
getPartitionsProd.close();
}
- if(customPartitioner == null) {
+ if (customPartitioner == null) {
this.partitioner = new FixedPartitioner();
} else {
this.partitioner = customPartitioner;
}
}
+ // ---------------------------------- Properties --------------------------
/**
+ * Defines whether the producer should fail on errors, or only log them.
+ * If this is set to true, then exceptions will be only logged, if set to false,
+ * exceptions will be eventually thrown and cause the streaming program to
+ * fail (and enter recovery).
+ *
+ * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+ */
+ public void setLogFailuresOnly(boolean logFailuresOnly) {
+ this.logFailuresOnly = logFailuresOnly;
+ }
+
+ // ----------------------------------- Utilities --------------------------
+
+ /**
* Initializes the connection to Kafka.
*/
@Override
public void open(Configuration configuration) {
producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
- partitioner.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), partitions);
-
- LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), topicId);
+ RuntimeContext ctx = getRuntimeContext();
+ partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+
+ LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}",
+ ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId);
+
+ if (logFailuresOnly) {
+ callback = new Callback() {
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception e) {
+ if (e != null) {
+ LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
+ }
+ }
+ };
+ }
+ else {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null && asyncException == null) {
+ asyncException = exception;
+ }
+ }
+ };
+ }
}
/**
@@ -192,27 +241,41 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN> {
* The incoming data
*/
@Override
- public void invoke(IN next) {
+ public void invoke(IN next) throws Exception {
+ // propagate asynchronous errors
+ checkErroneous();
+
byte[] serialized = schema.serialize(next);
-
- producer.send(new ProducerRecord<byte[], byte[]>(topicId,
- partitioner.partition(next, partitions.length),
- null,
- serialized),
- new ErrorLoggingCallback(topicId, null, serialized, false));
+ ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicId,
+ partitioner.partition(next, partitions.length),
+ null, serialized);
+
+ producer.send(record, callback);
}
@Override
- public void close() {
+ public void close() throws Exception {
if (producer != null) {
producer.close();
}
+
+ // make sure we propagate pending errors
+ checkErroneous();
}
// ----------------------------------- Utilities --------------------------
+ private void checkErroneous() throws Exception {
+ Exception e = asyncException;
+ if (e != null) {
+ // prevent double throwing
+ asyncException = null;
+ throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+ }
+ }
+
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
for(String broker: elements) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a8eeb3bb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..c5c3387
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducer.class)
+public class KafkaProducerTest extends TestLogger {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testPropagateExceptions() {
+ try {
+ // mock kafka producer
+ KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+
+ // partition setup
+ when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+ Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+ // failure when trying to send an element
+ when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+ .thenAnswer(new Answer<Future<RecordMetadata>>() {
+ @Override
+ public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+ Callback callback = (Callback) invocation.getArguments()[1];
+ callback.onCompletion(null, new Exception("Test error"));
+ return null;
+ }
+ });
+
+ // make sure the FlinkKafkaProducer instantiates our mock producer
+ whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+
+ // (1) producer that propagates errors
+
+ FlinkKafkaProducer<String> producerPropagating = new FlinkKafkaProducer<String>(
+ "mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
+
+ producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
+ producerPropagating.open(new Configuration());
+
+ try {
+ producerPropagating.invoke("value");
+ producerPropagating.invoke("value");
+ fail("This should fail with an exception");
+ }
+ catch (Exception e) {
+ assertNotNull(e.getCause());
+ assertNotNull(e.getCause().getMessage());
+ assertTrue(e.getCause().getMessage().contains("Test error"));
+ }
+
+ // (2) producer that only logs errors
+
+ FlinkKafkaProducer<String> producerLogging = new FlinkKafkaProducer<String>(
+ "mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
+ producerLogging.setLogFailuresOnly(true);
+
+ producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
+ producerLogging.open(new Configuration());
+
+ producerLogging.invoke("value");
+ producerLogging.invoke("value");
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}