You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:36 UTC
[19/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java
new file mode 100644
index 0000000..1add30e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
+ * not yet been created.
+ */
+public class ConsumerCoordinatorNotAvailableException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public ConsumerCoordinatorNotAvailableException() {
+ super();
+ }
+
+ public ConsumerCoordinatorNotAvailableException(String message) {
+ super(message);
+ }
+
+ public ConsumerCoordinatorNotAvailableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public ConsumerCoordinatorNotAvailableException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java
new file mode 100644
index 0000000..d5771df
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This exception indicates a record has failed it's internal CRC check, this generally indicates network or disk
+ * corruption.
+ */
+public class CorruptRecordException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public CorruptRecordException() {
+ super("This message has failed it's CRC checksum or is otherwise corrupt.");
+ }
+
+ public CorruptRecordException(String message) {
+ super(message);
+ }
+
+ public CorruptRecordException(Throwable cause) {
+ super(cause);
+ }
+
+ public CorruptRecordException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java
new file mode 100644
index 0000000..1c048d3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Server disconnected before a request could be completed.
+ */
+public class DisconnectException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public DisconnectException() {
+ super();
+ }
+
+ public DisconnectException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DisconnectException(String message) {
+ super(message);
+ }
+
+ public DisconnectException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java
new file mode 100644
index 0000000..ae46b5f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class IllegalGenerationException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public IllegalGenerationException() {
+ super();
+ }
+
+ public IllegalGenerationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IllegalGenerationException(String message) {
+ super(message);
+ }
+
+ public IllegalGenerationException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java
new file mode 100644
index 0000000..f7949f4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * An unchecked wrapper for InterruptedException
+ */
+public class InterruptException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InterruptException(InterruptedException cause) {
+ super(cause);
+ Thread.currentThread().interrupt();
+ }
+
+ public InterruptException(String message, InterruptedException cause) {
+ super(message, cause);
+ Thread.currentThread().interrupt();
+ }
+
+ public InterruptException(String message) {
+ super(message, new InterruptedException());
+ Thread.currentThread().interrupt();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java
new file mode 100644
index 0000000..710a391
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * An exception that may indicate the client's metadata is out of date
+ */
+public abstract class InvalidMetadataException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidMetadataException() {
+ super();
+ }
+
+ public InvalidMetadataException(String message) {
+ super(message);
+ }
+
+ public InvalidMetadataException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidMetadataException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java
new file mode 100644
index 0000000..a3cd167
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class InvalidRequiredAcksException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public InvalidRequiredAcksException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java
new file mode 100644
index 0000000..e0c5a41
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The client has attempted to perform an operation on an invalid topic.
+ */
+public class InvalidTopicException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public InvalidTopicException() {
+ super();
+ }
+
+ public InvalidTopicException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidTopicException(String message) {
+ super(message);
+ }
+
+ public InvalidTopicException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java
new file mode 100644
index 0000000..701a3c6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * There is no currently available leader for the given partition (either because a leadership election is in progress
+ * or because all replicas are down).
+ */
+public class LeaderNotAvailableException extends InvalidMetadataException {
+
+ private static final long serialVersionUID = 1L;
+
+ public LeaderNotAvailableException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java
new file mode 100644
index 0000000..ceca78f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A misc. network-related IOException occurred when making a request. This could be because the client's metadata is
+ * out of date and it is making a request to a node that is now dead.
+ */
+public class NetworkException extends InvalidMetadataException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NetworkException() {
+ super();
+ }
+
+ public NetworkException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NetworkException(String message) {
+ super(message);
+ }
+
+ public NetworkException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java
new file mode 100644
index 0000000..3aea94b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
+ * not a coordinator for.
+ */
+public class NotCoordinatorForConsumerException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NotCoordinatorForConsumerException() {
+ super();
+ }
+
+ public NotCoordinatorForConsumerException(String message) {
+ super(message);
+ }
+
+ public NotCoordinatorForConsumerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NotCoordinatorForConsumerException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java
new file mode 100644
index 0000000..c2f8203
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas This exception is raised when the low
+ * ISR size is discovered *after* the message was already appended to the log. Producer retries will cause duplicates.
+ */
+public class NotEnoughReplicasAfterAppendException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughReplicasAfterAppendException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java
new file mode 100644
index 0000000..93eb850
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Number of insync replicas for the partition is lower than min.insync.replicas
+ */
+public class NotEnoughReplicasException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughReplicasException() {
+ super();
+ }
+
+ public NotEnoughReplicasException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public NotEnoughReplicasException(String message) {
+ super(message);
+ }
+
+ public NotEnoughReplicasException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java
new file mode 100644
index 0000000..a7dafb4
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This server is not the leader for the given partition
+ */
+public class NotLeaderForPartitionException extends InvalidMetadataException {
+
+ private static final long serialVersionUID = 1L;
+
+ public NotLeaderForPartitionException() {
+ super();
+ }
+
+ public NotLeaderForPartitionException(String message) {
+ super(message);
+ }
+
+ public NotLeaderForPartitionException(Throwable cause) {
+ super(cause);
+ }
+
+ public NotLeaderForPartitionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java
new file mode 100644
index 0000000..f6a17b9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change
+ * for that offsets topic partition).
+ */
+public class OffsetLoadInProgressException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OffsetLoadInProgressException() {
+ super();
+ }
+
+ public OffsetLoadInProgressException(String message) {
+ super(message);
+ }
+
+ public OffsetLoadInProgressException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public OffsetLoadInProgressException(Throwable cause) {
+ super(cause);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java
new file mode 100644
index 0000000..6a57b5d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server.
+ */
+public class OffsetMetadataTooLarge extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OffsetMetadataTooLarge() {
+ }
+
+ public OffsetMetadataTooLarge(String message) {
+ super(message);
+ }
+
+ public OffsetMetadataTooLarge(Throwable cause) {
+ super(cause);
+ }
+
+ public OffsetMetadataTooLarge(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java
new file mode 100644
index 0000000..dfc35b7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This offset is either larger or smaller than the range of offsets the server has for the given partition.
+ *
+ */
+public class OffsetOutOfRangeException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public OffsetOutOfRangeException() {
+ }
+
+ public OffsetOutOfRangeException(String message) {
+ super(message);
+ }
+
+ public OffsetOutOfRangeException(Throwable cause) {
+ super(cause);
+ }
+
+ public OffsetOutOfRangeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java
new file mode 100644
index 0000000..360f042
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This record batch is larger than the maximum allowable size
+ */
+public class RecordBatchTooLargeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RecordBatchTooLargeException() {
+ super();
+ }
+
+ public RecordBatchTooLargeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordBatchTooLargeException(String message) {
+ super(message);
+ }
+
+ public RecordBatchTooLargeException(Throwable cause) {
+ super(cause);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java
new file mode 100644
index 0000000..0fd5a5f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This record is larger than the maximum allowable size
+ */
+public class RecordTooLargeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RecordTooLargeException() {
+ super();
+ }
+
+ public RecordTooLargeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RecordTooLargeException(String message) {
+ super(message);
+ }
+
+ public RecordTooLargeException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java
new file mode 100644
index 0000000..419174f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A retryable exception is a transient exception that if retried may succeed.
+ */
+public abstract class RetriableException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RetriableException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RetriableException(String message) {
+ super(message);
+ }
+
+ public RetriableException(Throwable cause) {
+ super(cause);
+ }
+
+ public RetriableException() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java
new file mode 100644
index 0000000..40f07fc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Any exception during serialization in the producer
+ */
+public class SerializationException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SerializationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SerializationException(String message) {
+ super(message);
+ }
+
+ public SerializationException(Throwable cause) {
+ super(cause);
+ }
+
+ public SerializationException() {
+ super();
+ }
+
+ /* avoid the expensive and useless stack trace for serialization exceptions */
+ @Override
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java
new file mode 100644
index 0000000..4fd5a32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Indicates that a request timed out.
+ */
+public class TimeoutException extends RetriableException {
+
+ private static final long serialVersionUID = 1L;
+
+ public TimeoutException() {
+ super();
+ }
+
+ public TimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public TimeoutException(String message) {
+ super(message);
+ }
+
+ public TimeoutException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java
new file mode 100644
index 0000000..a86997c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class UnknownConsumerIdException extends RetriableException {
+ private static final long serialVersionUID = 1L;
+
+ public UnknownConsumerIdException() {
+ super();
+ }
+
+ public UnknownConsumerIdException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UnknownConsumerIdException(String message) {
+ super(message);
+ }
+
+ public UnknownConsumerIdException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java
new file mode 100644
index 0000000..423e8d3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * An error occurred on the server for which the client doesn't have a corresponding error code. This is generally an
+ * unexpected error.
+ *
+ */
+public class UnknownServerException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnknownServerException() {
+ }
+
+ public UnknownServerException(String message) {
+ super(message);
+ }
+
+ public UnknownServerException(Throwable cause) {
+ super(cause);
+ }
+
+ public UnknownServerException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java
new file mode 100644
index 0000000..2b1a733
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This topic/partition doesn't exist
+ */
+public class UnknownTopicOrPartitionException extends InvalidMetadataException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnknownTopicOrPartitionException() {
+ }
+
+ public UnknownTopicOrPartitionException(String message) {
+ super(message);
+ }
+
+ public UnknownTopicOrPartitionException(Throwable throwable) {
+ super(throwable);
+ }
+
+ public UnknownTopicOrPartitionException(String message, Throwable throwable) {
+ super(message, throwable);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java
new file mode 100644
index 0000000..984e41e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.metrics;
+
+import org.apache.flink.kafka_backport.common.MetricName;
+
+import java.util.List;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A compound stat is a stat where a single measurement and associated data structure feeds many metrics. This is the
+ * example for a histogram which has many associated percentiles.
+ */
+public interface CompoundStat extends Stat {
+
+ public List<NamedMeasurable> stats();
+
+ public static class NamedMeasurable {
+
+ private final MetricName name;
+ private final Measurable stat;
+
+ public NamedMeasurable(MetricName name, Measurable stat) {
+ super();
+ this.name = name;
+ this.stat = stat;
+ }
+
+ public MetricName name() {
+ return name;
+ }
+
+ public Measurable stat() {
+ return stat;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java
new file mode 100644
index 0000000..5360efa
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.metrics;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.JMException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Register metrics in JMX as dynamic mbeans based on the metric names
+ */
+public class JmxReporter implements MetricsReporter {
+
+ private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
+ private static final Object LOCK = new Object();
+ private String prefix;
+ private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
+
+ public JmxReporter() {
+ this("");
+ }
+
+ /**
+ * Create a JMX reporter that prefixes all metrics with the given string.
+ */
+ public JmxReporter(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {}
+
+ @Override
+ public void init(List<KafkaMetric> metrics) {
+ synchronized (LOCK) {
+ for (KafkaMetric metric : metrics)
+ addAttribute(metric);
+ for (KafkaMbean mbean : mbeans.values())
+ reregister(mbean);
+ }
+ }
+
+ @Override
+ public void metricChange(KafkaMetric metric) {
+ synchronized (LOCK) {
+ KafkaMbean mbean = addAttribute(metric);
+ reregister(mbean);
+ }
+ }
+
+ private KafkaMbean addAttribute(KafkaMetric metric) {
+ try {
+ MetricName metricName = metric.metricName();
+ String mBeanName = getMBeanName(metricName);
+ if (!this.mbeans.containsKey(mBeanName))
+ mbeans.put(mBeanName, new KafkaMbean(mBeanName));
+ KafkaMbean mbean = this.mbeans.get(mBeanName);
+ mbean.setAttribute(metricName.name(), metric);
+ return mbean;
+ } catch (JMException e) {
+ throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
+ }
+ }
+
+ /**
+ * @param metricName
+ * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
+ */
+ private String getMBeanName(MetricName metricName) {
+ StringBuilder mBeanName = new StringBuilder();
+ mBeanName.append(prefix);
+ mBeanName.append(":type=");
+ mBeanName.append(metricName.group());
+ for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
+ if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
+ continue;
+ mBeanName.append(",");
+ mBeanName.append(entry.getKey());
+ mBeanName.append("=");
+ mBeanName.append(entry.getValue());
+ }
+ return mBeanName.toString();
+ }
+
+ public void close() {
+ synchronized (LOCK) {
+ for (KafkaMbean mbean : this.mbeans.values())
+ unregister(mbean);
+ }
+ }
+
+ private void unregister(KafkaMbean mbean) {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ try {
+ if (server.isRegistered(mbean.name()))
+ server.unregisterMBean(mbean.name());
+ } catch (JMException e) {
+ throw new KafkaException("Error unregistering mbean", e);
+ }
+ }
+
+ private void reregister(KafkaMbean mbean) {
+ unregister(mbean);
+ try {
+ ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
+ } catch (JMException e) {
+ throw new KafkaException("Error registering mbean " + mbean.name(), e);
+ }
+ }
+
+ private static class KafkaMbean implements DynamicMBean {
+ private final ObjectName objectName;
+ private final Map<String, KafkaMetric> metrics;
+
+ public KafkaMbean(String mbeanName) throws MalformedObjectNameException {
+ this.metrics = new HashMap<String, KafkaMetric>();
+ this.objectName = new ObjectName(mbeanName);
+ }
+
+ public ObjectName name() {
+ return objectName;
+ }
+
+ public void setAttribute(String name, KafkaMetric metric) {
+ this.metrics.put(name, metric);
+ }
+
+ @Override
+ public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
+ if (this.metrics.containsKey(name))
+ return this.metrics.get(name).value();
+ else
+ throw new AttributeNotFoundException("Could not find attribute " + name);
+ }
+
+ @Override
+ public AttributeList getAttributes(String[] names) {
+ try {
+ AttributeList list = new AttributeList();
+ for (String name : names)
+ list.add(new Attribute(name, getAttribute(name)));
+ return list;
+ } catch (Exception e) {
+ log.error("Error getting JMX attribute: ", e);
+ return new AttributeList();
+ }
+ }
+
+ @Override
+ public MBeanInfo getMBeanInfo() {
+ MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
+ int i = 0;
+ for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
+ String attribute = entry.getKey();
+ KafkaMetric metric = entry.getValue();
+ attrs[i] = new MBeanAttributeInfo(attribute,
+ double.class.getName(),
+ metric.metricName().description(),
+ true,
+ false,
+ false);
+ i += 1;
+ }
+ return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
+ }
+
+ @Override
+ public Object invoke(String name, Object[] params, String[] sig) throws MBeanException, ReflectionException {
+ throw new UnsupportedOperationException("Set not allowed.");
+ }
+
+ @Override
+ public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
+ InvalidAttributeValueException,
+ MBeanException,
+ ReflectionException {
+ throw new UnsupportedOperationException("Set not allowed.");
+ }
+
+ @Override
+ public AttributeList setAttributes(AttributeList list) {
+ throw new UnsupportedOperationException("Set not allowed.");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java
new file mode 100644
index 0000000..6245e79
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.metrics;
+
+import org.apache.flink.kafka_backport.common.Metric;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.apache.flink.kafka_backport.common.utils.Time;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public final class KafkaMetric implements Metric {
+
+ private MetricName metricName;
+ private final Object lock;
+ private final Time time;
+ private final Measurable measurable;
+ private MetricConfig config;
+
+ KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
+ super();
+ this.metricName = metricName;
+ this.lock = lock;
+ this.measurable = measurable;
+ this.config = config;
+ this.time = time;
+ }
+
+ MetricConfig config() {
+ return this.config;
+ }
+
+ @Override
+ public MetricName metricName() {
+ return this.metricName;
+ }
+
+ @Override
+ public double value() {
+ synchronized (this.lock) {
+ return value(time.milliseconds());
+ }
+ }
+
+ double value(long timeMs) {
+ return this.measurable.measure(config, timeMs);
+ }
+
+ public void config(MetricConfig config) {
+ synchronized (lock) {
+ this.config = config;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java
new file mode 100644
index 0000000..08ed823
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.metrics;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A measurable quantity that can be registered as a metric
+ */
+public interface Measurable {
+
+ /**
+ * Measure this quantity and return the result as a double
+ * @param config The configuration for this metric
+ * @param now The POSIX time in milliseconds the measurement is being taken
+ * @return The measured value
+ */
+ public double measure(MetricConfig config, long now);
+
+}