You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/17 21:03:23 UTC

[03/21] flink git commit: [FLINK-2863] [kafka connector] Kafka connector propagates async producer exceptions

[FLINK-2863] [kafka connector] Kafka connector propagates async producer exceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a8eeb3bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a8eeb3bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a8eeb3bb

Branch: refs/heads/master
Commit: a8eeb3bb1314d6c6cc19001181539d74bc25f419
Parents: 728df39
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 16 18:46:01 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Oct 17 18:45:02 2015 +0200

----------------------------------------------------------------------
 .../connectors/kafka/FlinkKafkaProducer.java    | 115 ++++++++++++++-----
 .../connectors/kafka/KafkaProducerTest.java     | 114 ++++++++++++++++++
 2 files changed, 203 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a8eeb3bb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
index 3d666ee..715f5ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -25,19 +27,20 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.NetUtils;
+
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.List;
+import java.util.Properties;
 
 
 /**
@@ -67,26 +70,34 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	/**
 	 * The name of the topic this producer is writing data to
 	 */
-	private String topicId;
+	private final String topicId;
 
 	/**
 	 * (Serializable) SerializationSchema for turning objects used with Flink into
 	 * byte[] for Kafka.
 	 */
-	private SerializationSchema<IN, byte[]> schema;
+	private final SerializationSchema<IN, byte[]> schema;
 
 	/**
 	 * User-provided partitioner for assigning an object to a Kafka partition.
 	 */
-	private KafkaPartitioner partitioner;
-
-	// -------------------------------- Runtime fields ------------------------------------------
+	private final KafkaPartitioner partitioner;
 
 	/**
-	 * KafkaProducer instance.
+	 * Flag indicating whether to accept failures (and log them), or to fail on failures
 	 */
+	private boolean logFailuresOnly;
+	
+	// -------------------------------- Runtime fields ------------------------------------------
+
+	/** KafkaProducer instance */
 	private transient KafkaProducer<byte[], byte[]> producer;
 
+	/** The callback than handles error propagation or logging callbacks */
+	private transient Callback callback;
+	
+	/** Errors encountered in the async producer are stored here */
+	private transient volatile Exception asyncException;
 
 	/**
 	 * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
@@ -154,8 +165,7 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 
 		// create a local KafkaProducer to get the list of partitions.
 		// this will also ensure locally that all required ProducerConfig values are set.
-		{
-			KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig);
+		try (KafkaProducer<Void, IN> getPartitionsProd = new KafkaProducer<>(this.producerConfig)) {
 			List<PartitionInfo> partitionsList = getPartitionsProd.partitionsFor(topicId);
 
 			this.partitions = new int[partitionsList.size()];
@@ -165,24 +175,63 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 			getPartitionsProd.close();
 		}
 
-		if(customPartitioner == null) {
+		if (customPartitioner == null) {
 			this.partitioner = new FixedPartitioner();
 		} else {
 			this.partitioner = customPartitioner;
 		}
 	}
 
+	// ---------------------------------- Properties --------------------------
 
 	/**
+	 * Defines whether the producer should fail on errors, or only log them.
+	 * If this is set to true, then exceptions will be only logged, if set to false,
+	 * exceptions will be eventually thrown and cause the streaming program to 
+	 * fail (and enter recovery).
+	 * 
+	 * @param logFailuresOnly The flag to indicate logging-only on exceptions.
+	 */
+	public void setLogFailuresOnly(boolean logFailuresOnly) {
+		this.logFailuresOnly = logFailuresOnly;
+	}
+
+	// ----------------------------------- Utilities --------------------------
+	
+	/**
 	 * Initializes the connection to Kafka.
 	 */
 	@Override
 	public void open(Configuration configuration) {
 		producer = new org.apache.kafka.clients.producer.KafkaProducer<>(this.producerConfig);
 
-		partitioner.open(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), partitions);
-
-		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), topicId);
+		RuntimeContext ctx = getRuntimeContext();
+		partitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), partitions);
+
+		LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into topic {}", 
+				ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks(), topicId);
+		
+		if (logFailuresOnly) {
+			callback = new Callback() {
+				
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception e) {
+					if (e != null) {
+						LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
+					}
+				}
+			};
+		}
+		else {
+			callback = new Callback() {
+				@Override
+				public void onCompletion(RecordMetadata metadata, Exception exception) {
+					if (exception != null && asyncException == null) {
+						asyncException = exception;
+					}
+				}
+			};
+		}
 	}
 
 	/**
@@ -192,27 +241,41 @@ public class FlinkKafkaProducer<IN> extends RichSinkFunction<IN>  {
 	 * 		The incoming data
 	 */
 	@Override
-	public void invoke(IN next) {
+	public void invoke(IN next) throws Exception {
+		// propagate asynchronous errors
+		checkErroneous();
+		
 		byte[] serialized = schema.serialize(next);
-
-		producer.send(new ProducerRecord<byte[], byte[]>(topicId,
-			partitioner.partition(next, partitions.length),
-			null,
-			serialized),
-			new ErrorLoggingCallback(topicId, null, serialized, false));
+		ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(topicId,
+				partitioner.partition(next, partitions.length),
+				null, serialized);
+		
+		producer.send(record, callback);
 	}
 
 
 	@Override
-	public void close() {
+	public void close() throws Exception {
 		if (producer != null) {
 			producer.close();
 		}
+		
+		// make sure we propagate pending errors
+		checkErroneous();
 	}
 
 
 	// ----------------------------------- Utilities --------------------------
 
+	private void checkErroneous() throws Exception {
+		Exception e = asyncException;
+		if (e != null) {
+			// prevent double throwing
+			asyncException = null;
+			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+		}
+	}
+	
 	public static Properties getPropertiesFromBrokerList(String brokerList) {
 		String[] elements = brokerList.split(",");
 		for(String broker: elements) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a8eeb3bb/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
new file mode 100644
index 0000000..c5c3387
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(FlinkKafkaProducer.class)
+public class KafkaProducerTest extends TestLogger {
+	
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testPropagateExceptions() {
+		try {
+			// mock kafka producer
+			KafkaProducer<?, ?> kafkaProducerMock = mock(KafkaProducer.class);
+			
+			// partition setup
+			when(kafkaProducerMock.partitionsFor(anyString())).thenReturn(
+					Arrays.asList(new PartitionInfo("mock_topic", 42, null, null, null)));
+
+			// failure when trying to send an element
+			when(kafkaProducerMock.send(any(ProducerRecord.class), any(Callback.class)))
+				.thenAnswer(new Answer<Future<RecordMetadata>>() {
+					@Override
+					public Future<RecordMetadata> answer(InvocationOnMock invocation) throws Throwable {
+						Callback callback = (Callback) invocation.getArguments()[1];
+						callback.onCompletion(null, new Exception("Test error"));
+						return null;
+					}
+				});
+			
+			// make sure the FlinkKafkaProducer instantiates our mock producer
+			whenNew(KafkaProducer.class).withAnyArguments().thenReturn(kafkaProducerMock);
+			
+			// (1) producer that propagates errors
+			
+			FlinkKafkaProducer<String> producerPropagating = new FlinkKafkaProducer<String>(
+					"mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
+
+			producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
+			producerPropagating.open(new Configuration());
+			
+			try {
+				producerPropagating.invoke("value");
+				producerPropagating.invoke("value");
+				fail("This should fail with an exception");
+			}
+			catch (Exception e) {
+				assertNotNull(e.getCause());
+				assertNotNull(e.getCause().getMessage());
+				assertTrue(e.getCause().getMessage().contains("Test error"));
+			}
+
+			// (2) producer that only logs errors
+			
+			FlinkKafkaProducer<String> producerLogging = new FlinkKafkaProducer<String>(
+					"mock_topic", new JavaDefaultStringSchema(), new Properties(), null);
+			producerLogging.setLogFailuresOnly(true);
+			
+			producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
+			producerLogging.open(new Configuration());
+
+			producerLogging.invoke("value");
+			producerLogging.invoke("value");
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}