You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/11/23 13:44:42 UTC
[3/3] flink git commit: [hotfix][kafka] Throw
FlinkKafkaProducer011Exception with error codes instead of generic Exception
[hotfix][kafka] Throw FlinkKafkaProducer011Exception with error codes instead of generic Exception
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ac32c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ac32c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ac32c59
Branch: refs/heads/master
Commit: 2ac32c596bfaa833beefefd8de85c504e2d8d623
Parents: ccf917d
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed Nov 22 11:37:48 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 23 14:44:18 2017 +0100
----------------------------------------------------------------------
.../kafka/FlinkKafka011ErrorCode.java | 26 ++++++++++++
.../kafka/FlinkKafka011Exception.java | 42 ++++++++++++++++++++
.../connectors/kafka/FlinkKafkaProducer011.java | 22 +++++-----
3 files changed, 81 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
new file mode 100644
index 0000000..4f5de4f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011ErrorCode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+/**
+ * Error codes used in {@link FlinkKafka011Exception}.
+ */
+public enum FlinkKafka011ErrorCode {
+ PRODUCERS_POOL_EMPTY,
+ EXTERNAL_ERROR
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
new file mode 100644
index 0000000..6b16e53
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafka011Exception.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception used by {@link FlinkKafkaProducer011} and {@link FlinkKafkaConsumer011}.
+ */
+public class FlinkKafka011Exception extends FlinkException {
+
+ private final FlinkKafka011ErrorCode errorCode;
+
+ public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message) {
+ super(message);
+ this.errorCode = errorCode;
+ }
+
+ public FlinkKafka011Exception(FlinkKafka011ErrorCode errorCode, String message, Throwable cause) {
+ super(message, cause);
+ this.errorCode = errorCode;
+ }
+
+ public FlinkKafka011ErrorCode getErrorCode() {
+ return errorCode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/2ac32c59/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
index 6b0136d..0c741f5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java
@@ -551,7 +551,7 @@ public class FlinkKafkaProducer011<IN>
}
@Override
- public void invoke(KafkaTransactionState transaction, IN next, Context context) throws Exception {
+ public void invoke(KafkaTransactionState transaction, IN next, Context context) throws FlinkKafka011Exception {
checkErroneous();
byte[] serializedKey = schema.serializeKey(next);
@@ -587,7 +587,7 @@ public class FlinkKafkaProducer011<IN>
}
@Override
- public void close() throws Exception {
+ public void close() throws FlinkKafka011Exception {
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null) {
// to avoid exceptions on aborting transactions with some pending records
@@ -612,7 +612,7 @@ public class FlinkKafkaProducer011<IN>
// ------------------- Logic for handling checkpoint flushing -------------------------- //
@Override
- protected KafkaTransactionState beginTransaction() throws Exception {
+ protected KafkaTransactionState beginTransaction() throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
FlinkKafkaProducer<byte[], byte[]> producer = createOrGetProducerFromPool();
@@ -631,12 +631,13 @@ public class FlinkKafkaProducer011<IN>
}
}
- private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws Exception {
+ private FlinkKafkaProducer<byte[], byte[]> createOrGetProducerFromPool() throws FlinkKafka011Exception {
FlinkKafkaProducer<byte[], byte[]> producer = getProducersPool().poll();
if (producer == null) {
String transactionalId = availableTransactionalIds.poll();
if (transactionalId == null) {
- throw new Exception(
+ throw new FlinkKafka011Exception(
+ FlinkKafka011ErrorCode.PRODUCERS_POOL_EMPTY,
"Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");
}
producer = initTransactionalProducer(transactionalId, true);
@@ -646,7 +647,7 @@ public class FlinkKafkaProducer011<IN>
}
@Override
- protected void preCommit(KafkaTransactionState transaction) throws Exception {
+ protected void preCommit(KafkaTransactionState transaction) throws FlinkKafka011Exception {
switch (semantic) {
case EXACTLY_ONCE:
case AT_LEAST_ONCE:
@@ -740,7 +741,7 @@ public class FlinkKafkaProducer011<IN>
* Flush pending records.
* @param transaction
*/
- private void flush(KafkaTransactionState transaction) throws Exception {
+ private void flush(KafkaTransactionState transaction) throws FlinkKafka011Exception {
if (transaction.producer != null) {
transaction.producer.flush();
}
@@ -936,12 +937,15 @@ public class FlinkKafkaProducer011<IN>
return producer;
}
- private void checkErroneous() throws Exception {
+ private void checkErroneous() throws FlinkKafka011Exception {
Exception e = asyncException;
if (e != null) {
// prevent double throwing
asyncException = null;
- throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+ throw new FlinkKafka011Exception(
+ FlinkKafka011ErrorCode.EXTERNAL_ERROR,
+ "Failed to send data to Kafka: " + e.getMessage(),
+ e);
}
}