You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/23 13:44:42 UTC

[3/3] flink git commit: [hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception

[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ac32c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ac32c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ac32c59

Branch: refs/heads/master
Commit: 2ac32c596bfaa833beefefd8de85c504e2d8d623
Parents: ccf917d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Nov 22 11:37:48 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 23 14:44:18 2017 +0100

----------------------------------------------------------------------
 .../kafka/FlinkKafka011ErrorCode.java           | 26 ++++++++++++
 .../kafka/FlinkKafka011Exception.java           | 42 ++++++++++++++++++++
 .../connectors/kafka/FlinkKafkaProducer011.java | 22 +++++-----
 3 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
new file mode 100644
index 0000000..4f5de4f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Error codes used in {@link FlinkKafka011Exception}.
+ */
+public enum FlinkKafka011ErrorCode {
+	PRODUCERS_POOL_EMPTY,
+	EXTERNAL_ERROR
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
new file mode 100644
index 0000000..6b16e53
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends FlinkException {
+
+	private final FlinkKafka011ErrorCode errorCode;
+
+	public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message) {
+		super(message);
+		this.errorCode = errorCode;
+	}
+
+	public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause) {
+		super(message, cause);
+		this.errorCode = errorCode;
+	}
+
+	public FlinkKafka011ErrorCode getErrorCode() {
+		return errorCode;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 6b0136d..0c741f5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -551,7 +551,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	@Override
-	public void invoke(KafkaTransactionState transaction, IN next, Context context) throws Exception {
+	public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
 		checkErroneous();
 
 		byte[] serializedKey = schema.serializeKey(next);
@@ -587,7 +587,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	@Override
-	public void close() throws Exception {
+	public void close() throws FlinkKafka011Exception {
 		final KafkaTransactionState currentTransaction = currentTransaction();
 		if (currentTransaction != null) {
 			// to avoid exceptions on aborting transactions with some pending records
@@ -612,7 +612,7 @@ public class FlinkKafkaProducer011<IN>
 	// ------------------- Logic for handling checkpoint flushing -------------------------- //
 
 	@Override
-	protected KafkaTransactionState beginTransaction() throws Exception {
+	protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
 		switch (semantic) {
 			case EXACTLY_ONCE:
 				FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool();
@@ -631,12 +631,13 @@ public class FlinkKafkaProducer011<IN>
 		}
 	}
 
-	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception {
+	private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws FlinkKafka011Exception {
 		FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
 		if (producer == null) {
 			String transactionalId = availableTransactionalIds.poll();
 			if (transactionalId == null) {
-				throw new Exception(
+				throw new FlinkKafka011Exception(
+					FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
 					"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
 			}
 			producer = initTransactionalProducer(transactionalId, true);
@@ -646,7 +647,7 @@ public class FlinkKafkaProducer011<IN>
 	}
 
 	@Override
-	protected void preCommit(KafkaTransactionState transaction) throws Exception {
+	protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
 		switch (semantic) {
 			case EXACTLY_ONCE:
 			case AT_LEAST_ONCE:
@@ -740,7 +741,7 @@ public class FlinkKafkaProducer011<IN>
 	 * Flush pending records.
 	 * @param transaction
 	 */
-	private void flush(KafkaTransactionState transaction) throws Exception {
+	private void flush(KafkaTransactionState transaction) throws FlinkKafka011Exception {
 		if (transaction.producer != null) {
 			transaction.producer.flush();
 		}
@@ -936,12 +937,15 @@ public class FlinkKafkaProducer011<IN>
 		return producer;
 	}
 
-	private void checkErroneous() throws Exception {
+	private void checkErroneous() throws FlinkKafka011Exception {
 		Exception e = asyncException;
 		if (e != null) {
 			// prevent double throwing
 			asyncException = null;
-			throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+			throw new FlinkKafka011Exception(
+				FlinkKafka011ErrorCode.EXTERNAL_ERROR,
+				"Failed to send data to Kafka: " + e.getMessage(),
+				e);
 		}
 	}