You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:26:02 UTC

[45/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java
deleted file mode 100644
index 1add30e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ConsumerCoordinatorNotAvailableException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
- * not yet been created.
- */
-public class ConsumerCoordinatorNotAvailableException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public ConsumerCoordinatorNotAvailableException() {
-        super();
-    }
-
-    public ConsumerCoordinatorNotAvailableException(String message) {
-        super(message);
-    }
-
-    public ConsumerCoordinatorNotAvailableException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ConsumerCoordinatorNotAvailableException(Throwable cause) {
-        super(cause);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java
deleted file mode 100644
index d5771df..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/CorruptRecordException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This exception indicates a record has failed it's internal CRC check, this generally indicates network or disk
- * corruption.
- */
-public class CorruptRecordException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public CorruptRecordException() {
-        super("This message has failed it's CRC checksum or is otherwise corrupt.");
-    }
-
-    public CorruptRecordException(String message) {
-        super(message);
-    }
-
-    public CorruptRecordException(Throwable cause) {
-        super(cause);
-    }
-
-    public CorruptRecordException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java
deleted file mode 100644
index 1c048d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/DisconnectException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Server disconnected before a request could be completed.
- */
-public class DisconnectException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public DisconnectException() {
-        super();
-    }
-
-    public DisconnectException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public DisconnectException(String message) {
-        super(message);
-    }
-
-    public DisconnectException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java
deleted file mode 100644
index ae46b5f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/IllegalGenerationException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class IllegalGenerationException extends RetriableException {
-    private static final long serialVersionUID = 1L;
-
-    public IllegalGenerationException() {
-        super();
-    }
-
-    public IllegalGenerationException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public IllegalGenerationException(String message) {
-        super(message);
-    }
-
-    public IllegalGenerationException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java
deleted file mode 100644
index f7949f4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InterruptException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * An unchecked wrapper for InterruptedException
- */
-public class InterruptException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-    
-    public InterruptException(InterruptedException cause) {
-        super(cause);
-        Thread.currentThread().interrupt();
-    }
-    
-    public InterruptException(String message, InterruptedException cause) {
-        super(message, cause);
-        Thread.currentThread().interrupt();
-    }
-
-    public InterruptException(String message) {
-        super(message, new InterruptedException());
-        Thread.currentThread().interrupt();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java
deleted file mode 100644
index 710a391..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidMetadataException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * An exception that may indicate the client's metadata is out of date
- */
-public abstract class InvalidMetadataException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public InvalidMetadataException() {
-        super();
-    }
-
-    public InvalidMetadataException(String message) {
-        super(message);
-    }
-
-    public InvalidMetadataException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public InvalidMetadataException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java
deleted file mode 100644
index a3cd167..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidRequiredAcksException.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class InvalidRequiredAcksException extends ApiException {
-    private static final long serialVersionUID = 1L;
-
-    public InvalidRequiredAcksException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java
deleted file mode 100644
index e0c5a41..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/InvalidTopicException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The client has attempted to perform an operation on an invalid topic.
- */
-public class InvalidTopicException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public InvalidTopicException() {
-        super();
-    }
-
-    public InvalidTopicException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public InvalidTopicException(String message) {
-        super(message);
-    }
-
-    public InvalidTopicException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java
deleted file mode 100644
index 701a3c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/LeaderNotAvailableException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * There is no currently available leader for the given partition (either because a leadership election is in progress
- * or because all replicas are down).
- */
-public class LeaderNotAvailableException extends InvalidMetadataException {
-
-    private static final long serialVersionUID = 1L;
-
-    public LeaderNotAvailableException(String message) {
-        super(message);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java
deleted file mode 100644
index ceca78f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NetworkException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A misc. network-related IOException occurred when making a request. This could be because the client's metadata is
- * out of date and it is making a request to a node that is now dead.
- */
-public class NetworkException extends InvalidMetadataException {
-
-    private static final long serialVersionUID = 1L;
-
-    public NetworkException() {
-        super();
-    }
-
-    public NetworkException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public NetworkException(String message) {
-        super(message);
-    }
-
-    public NetworkException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java
deleted file mode 100644
index 3aea94b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotCoordinatorForConsumerException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
- * not a coordinator for.
- */
-public class NotCoordinatorForConsumerException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public NotCoordinatorForConsumerException() {
-        super();
-    }
-
-    public NotCoordinatorForConsumerException(String message) {
-        super(message);
-    }
-
-    public NotCoordinatorForConsumerException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public NotCoordinatorForConsumerException(Throwable cause) {
-        super(cause);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java
deleted file mode 100644
index c2f8203..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasAfterAppendException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Number of insync replicas for the partition is lower than min.insync.replicas This exception is raised when the low
- * ISR size is discovered *after* the message was already appended to the log. Producer retries will cause duplicates.
- */
-public class NotEnoughReplicasAfterAppendException extends RetriableException {
-    private static final long serialVersionUID = 1L;
-
-    public NotEnoughReplicasAfterAppendException(String message) {
-        super(message);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java
deleted file mode 100644
index 93eb850..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotEnoughReplicasException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Number of insync replicas for the partition is lower than min.insync.replicas
- */
-public class NotEnoughReplicasException extends RetriableException {
-    private static final long serialVersionUID = 1L;
-
-    public NotEnoughReplicasException() {
-        super();
-    }
-
-    public NotEnoughReplicasException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public NotEnoughReplicasException(String message) {
-        super(message);
-    }
-
-    public NotEnoughReplicasException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java
deleted file mode 100644
index a7dafb4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/NotLeaderForPartitionException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This server is not the leader for the given partition
- */
-public class NotLeaderForPartitionException extends InvalidMetadataException {
-
-    private static final long serialVersionUID = 1L;
-
-    public NotLeaderForPartitionException() {
-        super();
-    }
-
-    public NotLeaderForPartitionException(String message) {
-        super(message);
-    }
-
-    public NotLeaderForPartitionException(Throwable cause) {
-        super(cause);
-    }
-
-    public NotLeaderForPartitionException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java
deleted file mode 100644
index f6a17b9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetLoadInProgressException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change
- * for that offsets topic partition).
- */
-public class OffsetLoadInProgressException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public OffsetLoadInProgressException() {
-        super();
-    }
-
-    public OffsetLoadInProgressException(String message) {
-        super(message);
-    }
-
-    public OffsetLoadInProgressException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public OffsetLoadInProgressException(Throwable cause) {
-        super(cause);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java
deleted file mode 100644
index 6a57b5d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetMetadataTooLarge.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The client has tried to save its offset with associated metadata larger than the maximum size allowed by the server.
- */
-public class OffsetMetadataTooLarge extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public OffsetMetadataTooLarge() {
-    }
-
-    public OffsetMetadataTooLarge(String message) {
-        super(message);
-    }
-
-    public OffsetMetadataTooLarge(Throwable cause) {
-        super(cause);
-    }
-
-    public OffsetMetadataTooLarge(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java
deleted file mode 100644
index dfc35b7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/OffsetOutOfRangeException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This offset is either larger or smaller than the range of offsets the server has for the given partition.
- * 
- */
-public class OffsetOutOfRangeException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public OffsetOutOfRangeException() {
-    }
-
-    public OffsetOutOfRangeException(String message) {
-        super(message);
-    }
-
-    public OffsetOutOfRangeException(Throwable cause) {
-        super(cause);
-    }
-
-    public OffsetOutOfRangeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java
deleted file mode 100644
index 360f042..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordBatchTooLargeException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This record batch is larger than the maximum allowable size
- */
-public class RecordBatchTooLargeException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public RecordBatchTooLargeException() {
-        super();
-    }
-
-    public RecordBatchTooLargeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RecordBatchTooLargeException(String message) {
-        super(message);
-    }
-
-    public RecordBatchTooLargeException(Throwable cause) {
-        super(cause);
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java
deleted file mode 100644
index 0fd5a5f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RecordTooLargeException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This record is larger than the maximum allowable size
- */
-public class RecordTooLargeException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public RecordTooLargeException() {
-        super();
-    }
-
-    public RecordTooLargeException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RecordTooLargeException(String message) {
-        super(message);
-    }
-
-    public RecordTooLargeException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java
deleted file mode 100644
index 419174f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/RetriableException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A retryable exception is a transient exception that if retried may succeed.
- */
-public abstract class RetriableException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public RetriableException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public RetriableException(String message) {
-        super(message);
-    }
-
-    public RetriableException(Throwable cause) {
-        super(cause);
-    }
-
-    public RetriableException() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java
deleted file mode 100644
index 40f07fc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/SerializationException.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- *  Any exception during serialization in the producer
- */
-public class SerializationException extends KafkaException {
-
-    private static final long serialVersionUID = 1L;
-
-    public SerializationException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public SerializationException(String message) {
-        super(message);
-    }
-
-    public SerializationException(Throwable cause) {
-        super(cause);
-    }
-
-    public SerializationException() {
-        super();
-    }
-
-    /* avoid the expensive and useless stack trace for serialization exceptions */
-    @Override
-    public Throwable fillInStackTrace() {
-        return this;
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java
deleted file mode 100644
index 4fd5a32..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/TimeoutException.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Indicates that a request timed out.
- */
-public class TimeoutException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public TimeoutException() {
-        super();
-    }
-
-    public TimeoutException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public TimeoutException(String message) {
-        super(message);
-    }
-
-    public TimeoutException(Throwable cause) {
-        super(cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java
deleted file mode 100644
index a86997c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownConsumerIdException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class UnknownConsumerIdException extends RetriableException {
-    private static final long serialVersionUID = 1L;
-
-    public UnknownConsumerIdException() {
-        super();
-    }
-
-    public UnknownConsumerIdException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public UnknownConsumerIdException(String message) {
-        super(message);
-    }
-
-    public UnknownConsumerIdException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java
deleted file mode 100644
index 423e8d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownServerException.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * An error occurred on the server for which the client doesn't have a corresponding error code. This is generally an
- * unexpected error.
- * 
- */
-public class UnknownServerException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public UnknownServerException() {
-    }
-
-    public UnknownServerException(String message) {
-        super(message);
-    }
-
-    public UnknownServerException(Throwable cause) {
-        super(cause);
-    }
-
-    public UnknownServerException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java
deleted file mode 100644
index 2b1a733..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/UnknownTopicOrPartitionException.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This topic/partition doesn't exist
- */
-public class UnknownTopicOrPartitionException extends InvalidMetadataException {
-
-    private static final long serialVersionUID = 1L;
-
-    public UnknownTopicOrPartitionException() {
-    }
-
-    public UnknownTopicOrPartitionException(String message) {
-        super(message);
-    }
-
-    public UnknownTopicOrPartitionException(Throwable throwable) {
-        super(throwable);
-    }
-
-    public UnknownTopicOrPartitionException(String message, Throwable throwable) {
-        super(message, throwable);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java
deleted file mode 100644
index 984e41e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/CompoundStat.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.MetricName;
-
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A compound stat is a stat where a single measurement and associated data structure feeds many metrics. This is the
- * example for a histogram which has many associated percentiles.
- */
-public interface CompoundStat extends Stat {
-
-    public List<NamedMeasurable> stats();
-
-    public static class NamedMeasurable {
-
-        private final MetricName name;
-        private final Measurable stat;
-
-        public NamedMeasurable(MetricName name, Measurable stat) {
-            super();
-            this.name = name;
-            this.stat = stat;
-        }
-
-        public MetricName name() {
-            return name;
-        }
-
-        public Measurable stat() {
-            return stat;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java
deleted file mode 100644
index 5360efa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/JmxReporter.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.Attribute;
-import javax.management.AttributeList;
-import javax.management.AttributeNotFoundException;
-import javax.management.DynamicMBean;
-import javax.management.InvalidAttributeValueException;
-import javax.management.JMException;
-import javax.management.MBeanAttributeInfo;
-import javax.management.MBeanException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Register metrics in JMX as dynamic mbeans based on the metric names
- */
-public class JmxReporter implements MetricsReporter {
-
-    private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
-    private static final Object LOCK = new Object();
-    private String prefix;
-    private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
-
-    public JmxReporter() {
-        this("");
-    }
-
-    /**
-     * Create a JMX reporter that prefixes all metrics with the given string.
-     */
-    public JmxReporter(String prefix) {
-        this.prefix = prefix;
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs) {}
-
-    @Override
-    public void init(List<KafkaMetric> metrics) {
-        synchronized (LOCK) {
-            for (KafkaMetric metric : metrics)
-                addAttribute(metric);
-            for (KafkaMbean mbean : mbeans.values())
-                reregister(mbean);
-        }
-    }
-
-    @Override
-    public void metricChange(KafkaMetric metric) {
-        synchronized (LOCK) {
-            KafkaMbean mbean = addAttribute(metric);
-            reregister(mbean);
-        }
-    }
-
-    private KafkaMbean addAttribute(KafkaMetric metric) {
-        try {
-            MetricName metricName = metric.metricName();
-            String mBeanName = getMBeanName(metricName);
-            if (!this.mbeans.containsKey(mBeanName))
-                mbeans.put(mBeanName, new KafkaMbean(mBeanName));
-            KafkaMbean mbean = this.mbeans.get(mBeanName);
-            mbean.setAttribute(metricName.name(), metric);
-            return mbean;
-        } catch (JMException e) {
-            throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
-        }
-    }
-
-    /**
-     * @param metricName
-     * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
-     */
-    private String getMBeanName(MetricName metricName) {
-        StringBuilder mBeanName = new StringBuilder();
-        mBeanName.append(prefix);
-        mBeanName.append(":type=");
-        mBeanName.append(metricName.group());
-        for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
-            if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
-                continue;
-            mBeanName.append(",");
-            mBeanName.append(entry.getKey());
-            mBeanName.append("=");
-            mBeanName.append(entry.getValue());
-        }
-        return mBeanName.toString();
-    }
-
-    public void close() {
-        synchronized (LOCK) {
-            for (KafkaMbean mbean : this.mbeans.values())
-                unregister(mbean);
-        }
-    }
-
-    private void unregister(KafkaMbean mbean) {
-        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
-        try {
-            if (server.isRegistered(mbean.name()))
-                server.unregisterMBean(mbean.name());
-        } catch (JMException e) {
-            throw new KafkaException("Error unregistering mbean", e);
-        }
-    }
-
-    private void reregister(KafkaMbean mbean) {
-        unregister(mbean);
-        try {
-            ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name());
-        } catch (JMException e) {
-            throw new KafkaException("Error registering mbean " + mbean.name(), e);
-        }
-    }
-
-    private static class KafkaMbean implements DynamicMBean {
-        private final ObjectName objectName;
-        private final Map<String, KafkaMetric> metrics;
-
-        public KafkaMbean(String mbeanName) throws MalformedObjectNameException {
-            this.metrics = new HashMap<String, KafkaMetric>();
-            this.objectName = new ObjectName(mbeanName);
-        }
-
-        public ObjectName name() {
-            return objectName;
-        }
-
-        public void setAttribute(String name, KafkaMetric metric) {
-            this.metrics.put(name, metric);
-        }
-
-        @Override
-        public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
-            if (this.metrics.containsKey(name))
-                return this.metrics.get(name).value();
-            else
-                throw new AttributeNotFoundException("Could not find attribute " + name);
-        }
-
-        @Override
-        public AttributeList getAttributes(String[] names) {
-            try {
-                AttributeList list = new AttributeList();
-                for (String name : names)
-                    list.add(new Attribute(name, getAttribute(name)));
-                return list;
-            } catch (Exception e) {
-                log.error("Error getting JMX attribute: ", e);
-                return new AttributeList();
-            }
-        }
-
-        @Override
-        public MBeanInfo getMBeanInfo() {
-            MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
-            int i = 0;
-            for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
-                String attribute = entry.getKey();
-                KafkaMetric metric = entry.getValue();
-                attrs[i] = new MBeanAttributeInfo(attribute,
-                                                  double.class.getName(),
-                                                  metric.metricName().description(),
-                                                  true,
-                                                  false,
-                                                  false);
-                i += 1;
-            }
-            return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
-        }
-
-        @Override
-        public Object invoke(String name, Object[] params, String[] sig) throws MBeanException, ReflectionException {
-            throw new UnsupportedOperationException("Set not allowed.");
-        }
-
-        @Override
-        public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
-                InvalidAttributeValueException,
-                MBeanException,
-                ReflectionException {
-            throw new UnsupportedOperationException("Set not allowed.");
-        }
-
-        @Override
-        public AttributeList setAttributes(AttributeList list) {
-            throw new UnsupportedOperationException("Set not allowed.");
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java
deleted file mode 100644
index 6245e79..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/KafkaMetric.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-import org.apache.flink.kafka_backport.common.Metric;
-import org.apache.flink.kafka_backport.common.MetricName;
-import org.apache.flink.kafka_backport.common.utils.Time;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public final class KafkaMetric implements Metric {
-
-    private MetricName metricName;
-    private final Object lock;
-    private final Time time;
-    private final Measurable measurable;
-    private MetricConfig config;
-
-    KafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
-        super();
-        this.metricName = metricName;
-        this.lock = lock;
-        this.measurable = measurable;
-        this.config = config;
-        this.time = time;
-    }
-
-    MetricConfig config() {
-        return this.config;
-    }
-
-    @Override
-    public MetricName metricName() {
-        return this.metricName;
-    }
-
-    @Override
-    public double value() {
-        synchronized (this.lock) {
-            return value(time.milliseconds());
-        }
-    }
-
-    double value(long timeMs) {
-        return this.measurable.measure(config, timeMs);
-    }
-
-    public void config(MetricConfig config) {
-        synchronized (lock) {
-            this.config = config;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java
deleted file mode 100644
index 08ed823..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/metrics/Measurable.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.metrics;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A measurable quantity that can be registered as a metric
- */
-public interface Measurable {
-
-    /**
-     * Measure this quantity and return the result as a double
-     * @param config The configuration for this metric
-     * @param now The POSIX time in milliseconds the measurement is being taken
-     * @return The measured value
-     */
-    public double measure(MetricConfig config, long now);
-
-}