You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:26:03 UTC
[46/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
deleted file mode 100644
index 90a1cfa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Listener interface to hook into RequestFuture completion.
- */
-public interface RequestFutureListener<T> {
-
- void onSuccess(T value);
-
- void onFailure(RuntimeException e);
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
deleted file mode 100644
index d94486e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.common.errors.RetriableException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Exception used in {@link ConsumerNetworkClient} to indicate the failure
- * to transmit a request to the networking layer. This could be either because
- * the client is still connecting to the given host or its send buffer is full.
- */
-public class SendFailedException extends RetriableException {
- public static final SendFailedException INSTANCE = new SendFailedException();
-
- private static final long serialVersionUID = 1L;
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
deleted file mode 100644
index adff6e0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.common.errors.InvalidMetadataException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Thrown when metadata is old and needs to be refreshed.
- */
-public class StaleMetadataException extends InvalidMetadataException {
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
deleted file mode 100644
index f5e8802..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.clients.consumer.internals;
-
-import org.apache.flink.kafka_backport.clients.consumer.OffsetResetStrategy;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A class for tracking the topics, partitions, and offsets for the consumer
- */
-public class SubscriptionState {
-
- /* the list of topics the user has requested */
- private final Set<String> subscribedTopics;
-
- /* the list of partitions the user has requested */
- private final Set<TopicPartition> subscribedPartitions;
-
- /* the list of partitions currently assigned */
- private final Set<TopicPartition> assignedPartitions;
-
- /* the offset exposed to the user */
- private final Map<TopicPartition, Long> consumed;
-
- /* the current point we have fetched up to */
- private final Map<TopicPartition, Long> fetched;
-
- /* the last committed offset for each partition */
- private final Map<TopicPartition, Long> committed;
-
- /* do we need to request a partition assignment from the coordinator? */
- private boolean needsPartitionAssignment;
-
- /* do we need to request the latest committed offsets from the coordinator? */
- private boolean needsFetchCommittedOffsets;
-
- /* Partitions that need to be reset before fetching */
- private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
-
- /* Default offset reset strategy */
- private OffsetResetStrategy offsetResetStrategy;
-
- public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
- this.offsetResetStrategy = offsetResetStrategy;
- this.subscribedTopics = new HashSet<String>();
- this.subscribedPartitions = new HashSet<TopicPartition>();
- this.assignedPartitions = new HashSet<TopicPartition>();
- this.consumed = new HashMap<TopicPartition, Long>();
- this.fetched = new HashMap<TopicPartition, Long>();
- this.committed = new HashMap<TopicPartition, Long>();
- this.needsPartitionAssignment = false;
- this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
- this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
- }
-
- public void subscribe(String topic) {
- if (this.subscribedPartitions.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
- if (!this.subscribedTopics.contains(topic)) {
- this.subscribedTopics.add(topic);
- this.needsPartitionAssignment = true;
- }
- }
-
- public void unsubscribe(String topic) {
- if (!this.subscribedTopics.contains(topic))
- throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
- this.subscribedTopics.remove(topic);
- this.needsPartitionAssignment = true;
- for (TopicPartition tp: assignedPartitions())
- if (topic.equals(tp.topic()))
- clearPartition(tp);
- }
-
- public void needReassignment() {
- this.needsPartitionAssignment = true;
- }
-
- public void subscribe(TopicPartition tp) {
- if (this.subscribedTopics.size() > 0)
- throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
- this.subscribedPartitions.add(tp);
- this.assignedPartitions.add(tp);
- }
-
- public void unsubscribe(TopicPartition partition) {
- if (!subscribedPartitions.contains(partition))
- throw new IllegalStateException("Partition " + partition + " was never subscribed to.");
- subscribedPartitions.remove(partition);
- clearPartition(partition);
- }
-
- private void clearPartition(TopicPartition tp) {
- this.assignedPartitions.remove(tp);
- this.committed.remove(tp);
- this.fetched.remove(tp);
- this.consumed.remove(tp);
- this.resetPartitions.remove(tp);
- }
-
- public void clearAssignment() {
- this.assignedPartitions.clear();
- this.committed.clear();
- this.fetched.clear();
- this.consumed.clear();
- this.needsPartitionAssignment = !subscribedTopics().isEmpty();
- }
-
- public Set<String> subscribedTopics() {
- return this.subscribedTopics;
- }
-
- public Long fetched(TopicPartition tp) {
- return this.fetched.get(tp);
- }
-
- public void fetched(TopicPartition tp, long offset) {
- if (!this.assignedPartitions.contains(tp))
- throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to.");
- this.fetched.put(tp, offset);
- }
-
- public void committed(TopicPartition tp, long offset) {
- this.committed.put(tp, offset);
- }
-
- public Long committed(TopicPartition tp) {
- return this.committed.get(tp);
- }
-
- public void needRefreshCommits() {
- this.needsFetchCommittedOffsets = true;
- }
-
- public boolean refreshCommitsNeeded() {
- return this.needsFetchCommittedOffsets;
- }
-
- public void commitsRefreshed() {
- this.needsFetchCommittedOffsets = false;
- }
-
- public void seek(TopicPartition tp, long offset) {
- fetched(tp, offset);
- consumed(tp, offset);
- resetPartitions.remove(tp);
- }
-
- public Set<TopicPartition> assignedPartitions() {
- return this.assignedPartitions;
- }
-
- public boolean partitionsAutoAssigned() {
- return !this.subscribedTopics.isEmpty();
- }
-
- public void consumed(TopicPartition tp, long offset) {
- if (!this.assignedPartitions.contains(tp))
- throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to.");
- this.consumed.put(tp, offset);
- }
-
- public Long consumed(TopicPartition partition) {
- return this.consumed.get(partition);
- }
-
- public Map<TopicPartition, Long> allConsumed() {
- return this.consumed;
- }
-
- public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
- this.resetPartitions.put(partition, offsetResetStrategy);
- this.fetched.remove(partition);
- this.consumed.remove(partition);
- }
-
- public void needOffsetReset(TopicPartition partition) {
- needOffsetReset(partition, offsetResetStrategy);
- }
-
- public boolean isOffsetResetNeeded(TopicPartition partition) {
- return resetPartitions.containsKey(partition);
- }
-
- public boolean isOffsetResetNeeded() {
- return !resetPartitions.isEmpty();
- }
-
- public OffsetResetStrategy resetStrategy(TopicPartition partition) {
- return resetPartitions.get(partition);
- }
-
- public boolean hasAllFetchPositions() {
- return this.fetched.size() >= this.assignedPartitions.size();
- }
-
- public Set<TopicPartition> missingFetchPositions() {
- Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions);
- copy.removeAll(this.fetched.keySet());
- return copy;
- }
-
- public boolean partitionAssignmentNeeded() {
- return this.needsPartitionAssignment;
- }
-
- public void changePartitionAssignment(List<TopicPartition> assignments) {
- for (TopicPartition tp : assignments)
- if (!this.subscribedTopics.contains(tp.topic()))
- throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
- this.clearAssignment();
- this.assignedPartitions.addAll(assignments);
- this.needsPartitionAssignment = false;
- }
-
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
deleted file mode 100644
index f5e12d3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
- */
-public final class Cluster {
-
- private final List<Node> nodes;
- private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
- private final Map<String, List<PartitionInfo>> partitionsByTopic;
- private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
- private final Map<Integer, List<PartitionInfo>> partitionsByNode;
- private final Map<Integer, Node> nodesById;
-
- /**
- * Create a new cluster with the given nodes and partitions
- * @param nodes The nodes in the cluster
- * @param partitions Information about a subset of the topic-partitions this cluster hosts
- */
- public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
- // make a randomized, unmodifiable copy of the nodes
- List<Node> copy = new ArrayList<Node>(nodes);
- Collections.shuffle(copy);
- this.nodes = Collections.unmodifiableList(copy);
-
- this.nodesById = new HashMap<Integer, Node>();
- for (Node node: nodes)
- this.nodesById.put(node.id(), node);
-
- // index the partitions by topic/partition for quick lookup
- this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
- for (PartitionInfo p : partitions)
- this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
-
- // index the partitions by topic and node respectively, and make the lists
- // unmodifiable so we can hand them out in user-facing apis without risk
- // of the client modifying the contents
- HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
- HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
- for (Node n : this.nodes) {
- partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
- }
- for (PartitionInfo p : partitions) {
- if (!partsForTopic.containsKey(p.topic()))
- partsForTopic.put(p.topic(), new ArrayList<PartitionInfo>());
- List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
- psTopic.add(p);
-
- if (p.leader() != null) {
- List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
- psNode.add(p);
- }
- }
- this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
- this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
- for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
- String topic = entry.getKey();
- List<PartitionInfo> partitionList = entry.getValue();
- this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
- List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
- for (PartitionInfo part : partitionList) {
- if (part.leader() != null)
- availablePartitions.add(part);
- }
- this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
- }
- this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
- for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
- this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
-
- }
-
- /**
- * Create an empty cluster instance with no nodes and no topic-partitions.
- */
- public static Cluster empty() {
- return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
- }
-
- /**
- * Create a "bootstrap" cluster using the given list of host/ports
- * @param addresses The addresses
- * @return A cluster for these hosts/ports
- */
- public static Cluster bootstrap(List<InetSocketAddress> addresses) {
- List<Node> nodes = new ArrayList<Node>();
- int nodeId = -1;
- for (InetSocketAddress address : addresses)
- nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
- return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
- }
-
- /**
- * @return The known set of nodes
- */
- public List<Node> nodes() {
- return this.nodes;
- }
-
- /**
- * Get the node by the node id (or null if no such node exists)
- * @param id The id of the node
- * @return The node, or null if no such node exists
- */
- public Node nodeById(int id) {
- return this.nodesById.get(id);
- }
-
- /**
- * Get the current leader for the given topic-partition
- * @param topicPartition The topic and partition we want to know the leader for
- * @return The node that is the leader for this topic-partition, or null if there is currently no leader
- */
- public Node leaderFor(TopicPartition topicPartition) {
- PartitionInfo info = partitionsByTopicPartition.get(topicPartition);
- if (info == null)
- return null;
- else
- return info.leader();
- }
-
- /**
- * Get the metadata for the specified partition
- * @param topicPartition The topic and partition to fetch info for
- * @return The metadata about the given topic and partition
- */
- public PartitionInfo partition(TopicPartition topicPartition) {
- return partitionsByTopicPartition.get(topicPartition);
- }
-
- /**
- * Get the list of partitions for this topic
- * @param topic The topic name
- * @return A list of partitions
- */
- public List<PartitionInfo> partitionsForTopic(String topic) {
- return this.partitionsByTopic.get(topic);
- }
-
- /**
- * Get the list of available partitions for this topic
- * @param topic The topic name
- * @return A list of partitions
- */
- public List<PartitionInfo> availablePartitionsForTopic(String topic) {
- return this.availablePartitionsByTopic.get(topic);
- }
-
- /**
- * Get the list of partitions whose leader is this node
- * @param nodeId The node id
- * @return A list of partitions
- */
- public List<PartitionInfo> partitionsForNode(int nodeId) {
- return this.partitionsByNode.get(nodeId);
- }
-
- /**
- * Get all topics.
- * @return a set of all topics
- */
- public Set<String> topics() {
- return this.partitionsByTopic.keySet();
- }
-
- @Override
- public String toString() {
- return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
deleted file mode 100644
index fef2136..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
- */
-public interface Configurable {
-
- /**
- * Configure this class with the given key-value pairs
- */
- public void configure(Map<String, ?> configs);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
deleted file mode 100644
index d9df6e8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The base class of all other Kafka exceptions
- */
-public class KafkaException extends RuntimeException {
-
- private final static long serialVersionUID = 1L;
-
- public KafkaException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public KafkaException(String message) {
- super(message);
- }
-
- public KafkaException(Throwable cause) {
- super(cause);
- }
-
- public KafkaException() {
- super();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
deleted file mode 100644
index 8858ffe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A numerical metric tracked for monitoring purposes
- */
-public interface Metric {
-
- /**
- * A name for this metric
- */
- public MetricName metricName();
-
- /**
- * The value of the metric
- */
- public double value();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
deleted file mode 100644
index 18dd955..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes
- * <p>
- * This class captures the following parameters
- * <pre>
- * <b>name</b> The name of the metric
- * <b>group</b> logical group name of the metrics to which this metric belongs.
- * <b>description</b> A human-readable description to include in the metric. This is optional.
- * <b>tags</b> additional key/value attributes of the metric. This is optional.
- * </pre>
- * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting.
- * <p>
- * Ex: standard JMX MBean can be constructed like <b>domainName:type=group,key1=val1,key2=val2</b>
- * <p>
- * Usage looks something like this:
- * <pre>{@code
- * // set up metrics:
- * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
- * Sensor sensor = metrics.sensor("message-sizes");
- *
- * Map<String, String> metricTags = new LinkedHashMap<String, String>();
- * metricTags.put("client-id", "producer-1");
- * metricTags.put("topic", "topic");
- *
- * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
- * sensor.add(metricName, new Avg());
- *
- * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
- * sensor.add(metricName, new Max());
- *
- * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
- * sensor.add(metricName, new Min());
- *
- * // as messages are sent we record the sizes
- * sensor.record(messageSize);
- * }</pre>
- */
-public final class MetricName {
-
- private final String name;
- private final String group;
- private final String description;
- private Map<String, String> tags;
- private int hash = 0;
-
- /**
- * @param name The name of the metric
- * @param group logical group name of the metrics to which this metric belongs
- * @param description A human-readable description to include in the metric
- * @param tags additional key/value attributes of the metric
- */
- public MetricName(String name, String group, String description, Map<String, String> tags) {
- this.name = Utils.notNull(name);
- this.group = Utils.notNull(group);
- this.description = Utils.notNull(description);
- this.tags = Utils.notNull(tags);
- }
-
- /**
- * @param name The name of the metric
- * @param group logical group name of the metrics to which this metric belongs
- * @param description A human-readable description to include in the metric
- * @param keyValue additional key/value attributes of the metric (must come in pairs)
- */
- public MetricName(String name, String group, String description, String... keyValue) {
- this(name, group, description, getTags(keyValue));
- }
-
- private static Map<String, String> getTags(String... keyValue) {
- if ((keyValue.length % 2) != 0)
- throw new IllegalArgumentException("keyValue needs to be specified in paris");
- Map<String, String> tags = new HashMap<String, String>();
-
- for (int i = 0; i < keyValue.length / 2; i++)
- tags.put(keyValue[i], keyValue[i + 1]);
- return tags;
- }
-
- /**
- * @param name The name of the metric
- * @param group logical group name of the metrics to which this metric belongs
- * @param tags key/value attributes of the metric
- */
- public MetricName(String name, String group, Map<String, String> tags) {
- this(name, group, "", tags);
- }
-
- /**
- * @param name The name of the metric
- * @param group logical group name of the metrics to which this metric belongs
- * @param description A human-readable description to include in the metric
- */
- public MetricName(String name, String group, String description) {
- this(name, group, description, new HashMap<String, String>());
- }
-
- /**
- * @param name The name of the metric
- * @param group logical group name of the metrics to which this metric belongs
- */
- public MetricName(String name, String group) {
- this(name, group, "", new HashMap<String, String>());
- }
-
- public String name() {
- return this.name;
- }
-
- public String group() {
- return this.group;
- }
-
- public Map<String, String> tags() {
- return this.tags;
- }
-
- public String description() {
- return this.description;
- }
-
- @Override
- public int hashCode() {
- if (hash != 0)
- return hash;
- final int prime = 31;
- int result = 1;
- result = prime * result + ((group == null) ? 0 : group.hashCode());
- result = prime * result + ((name == null) ? 0 : name.hashCode());
- result = prime * result + ((tags == null) ? 0 : tags.hashCode());
- this.hash = result;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- MetricName other = (MetricName) obj;
- if (group == null) {
- if (other.group != null)
- return false;
- } else if (!group.equals(other.group))
- return false;
- if (name == null) {
- if (other.name != null)
- return false;
- } else if (!name.equals(other.name))
- return false;
- if (tags == null) {
- if (other.tags != null)
- return false;
- } else if (!tags.equals(other.tags))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "MetricName [name=" + name + ", group=" + group + ", description="
- + description + ", tags=" + tags + "]";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
deleted file mode 100644
index dd0537e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-import java.io.Serializable;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Information about a Kafka node
- */
-public class Node implements Serializable {
-
- private final int id;
- private final String idString;
- private final String host;
- private final int port;
-
- public Node(int id, String host, int port) {
- super();
- this.id = id;
- this.idString = Integer.toString(id);
- this.host = host;
- this.port = port;
- }
-
- public static Node noNode() {
- return new Node(-1, "", -1);
- }
-
- /**
- * The node id of this node
- */
- public int id() {
- return id;
- }
-
- /**
- * String representation of the node id.
- * Typically the integer id is used to serialize over the wire, the string representation is used as an identifier with NetworkClient code
- */
- public String idString() {
- return idString;
- }
-
- /**
- * The host name for this node
- */
- public String host() {
- return host;
- }
-
- /**
- * The port for this node
- */
- public int port() {
- return port;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + id;
- result = prime * result + port;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Node other = (Node) obj;
- if (host == null) {
- if (other.host != null)
- return false;
- } else if (!host.equals(other.host))
- return false;
- if (id != other.id)
- return false;
- if (port != other.port)
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return "Node(" + id + ", " + host + ", " + port + ")";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
deleted file mode 100644
index ac7cc61..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Information about a topic-partition.
- */
-public class PartitionInfo {
-
- private final String topic;
- private final int partition;
- private final Node leader;
- private final Node[] replicas;
- private final Node[] inSyncReplicas;
-
- public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
- this.topic = topic;
- this.partition = partition;
- this.leader = leader;
- this.replicas = replicas;
- this.inSyncReplicas = inSyncReplicas;
- }
-
- /**
- * The topic name
- */
- public String topic() {
- return topic;
- }
-
- /**
- * The partition id
- */
- public int partition() {
- return partition;
- }
-
- /**
- * The node id of the node currently acting as a leader for this partition or -1 if there is no leader
- */
- public Node leader() {
- return leader;
- }
-
- /**
- * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
- */
- public Node[] replicas() {
- return replicas;
- }
-
- /**
- * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
- * the leader should fail
- */
- public Node[] inSyncReplicas() {
- return inSyncReplicas;
- }
-
- @Override
- public String toString() {
- return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s",
- topic,
- partition,
- leader == null ? "none" : leader.id(),
- fmtNodeIds(replicas),
- fmtNodeIds(inSyncReplicas));
- }
-
- /* Extract the node ids from each item in the array and format for display */
- private String fmtNodeIds(Node[] nodes) {
- StringBuilder b = new StringBuilder("[");
- for (int i = 0; i < nodes.length - 1; i++) {
- b.append(Integer.toString(nodes[i].id()));
- b.append(',');
- }
- if (nodes.length > 0) {
- b.append(Integer.toString(nodes[nodes.length - 1].id()));
- b.append(',');
- }
- b.append("]");
- return b.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
deleted file mode 100644
index cfb4848..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.kafka_backport.common;
-
-import java.io.Serializable;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A topic name and partition number
- */
-public final class TopicPartition implements Serializable {
-
- private int hash = 0;
- private final int partition;
- private final String topic;
-
- public TopicPartition(String topic, int partition) {
- this.partition = partition;
- this.topic = topic;
- }
-
- public int partition() {
- return partition;
- }
-
- public String topic() {
- return topic;
- }
-
- @Override
- public int hashCode() {
- if (hash != 0)
- return hash;
- final int prime = 31;
- int result = 1;
- result = prime * result + partition;
- result = prime * result + ((topic == null) ? 0 : topic.hashCode());
- this.hash = result;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TopicPartition other = (TopicPartition) obj;
- if (partition != other.partition)
- return false;
- if (topic == null) {
- if (other.topic != null)
- return false;
- } else if (!topic.equals(other.topic))
- return false;
- return true;
- }
-
- @Override
- public String toString() {
- return topic + "-" + partition;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
deleted file mode 100644
index 1b5cbc9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.config;
-
-import org.apache.flink.kafka_backport.common.Configurable;
-import org.apache.flink.kafka_backport.common.KafkaException;
-import org.apache.flink.kafka_backport.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A convenient base class for configurations to extend.
- * <p>
- * This class holds both the original configuration that was provided as well as the parsed
- */
-public class AbstractConfig {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- /* configs for which values have been requested, used to detect unused configs */
- private final Set<String> used;
-
- /* the original values passed in by the user */
- private final Map<String, ?> originals;
-
- /* the parsed values */
- private final Map<String, Object> values;
-
- @SuppressWarnings("unchecked")
- public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
- /* check that all the keys are really strings */
- for (Object key : originals.keySet())
- if (!(key instanceof String))
- throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
- this.originals = (Map<String, ?>) originals;
- this.values = definition.parse(this.originals);
- this.used = Collections.synchronizedSet(new HashSet<String>());
- logAll();
- }
-
- protected Object get(String key) {
- if (!values.containsKey(key))
- throw new ConfigException(String.format("Unknown configuration '%s'", key));
- used.add(key);
- return values.get(key);
- }
-
- public Short getShort(String key) {
- return (Short) get(key);
- }
-
- public Integer getInt(String key) {
- return (Integer) get(key);
- }
-
- public Long getLong(String key) {
- return (Long) get(key);
- }
-
- public Double getDouble(String key) {
- return (Double) get(key);
- }
-
- @SuppressWarnings("unchecked")
- public List<String> getList(String key) {
- return (List<String>) get(key);
- }
-
- public boolean getBoolean(String key) {
- return (Boolean) get(key);
- }
-
- public String getString(String key) {
- return (String) get(key);
- }
-
- public Class<?> getClass(String key) {
- return (Class<?>) get(key);
- }
-
- public Set<String> unused() {
- Set<String> keys = new HashSet<String>(originals.keySet());
- keys.removeAll(used);
- return keys;
- }
-
- public Map<String, Object> originals() {
- Map<String, Object> copy = new HashMap<String, Object>();
- copy.putAll(originals);
- return copy;
- }
-
- private void logAll() {
- StringBuilder b = new StringBuilder();
- b.append(getClass().getSimpleName());
- b.append(" values: ");
- b.append(Utils.NL);
- for (Map.Entry<String, Object> entry : this.values.entrySet()) {
- b.append('\t');
- b.append(entry.getKey());
- b.append(" = ");
- b.append(entry.getValue());
- b.append(Utils.NL);
- }
- log.info(b.toString());
- }
-
- /**
- * Log warnings for any unused configurations
- */
- public void logUnused() {
- for (String key : unused())
- log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key));
- }
-
- /**
- * Get a configured instance of the give class specified by the given configuration key. If the object implements
- * Configurable configure it using the configuration.
- *
- * @param key The configuration key for the class
- * @param t The interface the class should implement
- * @return A configured instance of the class
- */
- public <T> T getConfiguredInstance(String key, Class<T> t) {
- Class<?> c = getClass(key);
- if (c == null)
- return null;
- Object o = Utils.newInstance(c);
- if (!t.isInstance(o))
- throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
- if (o instanceof Configurable)
- ((Configurable) o).configure(this.originals);
- return t.cast(o);
- }
-
- public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
- List<String> klasses = getList(key);
- List<T> objects = new ArrayList<T>();
- for (String klass : klasses) {
- Class<?> c;
- try {
- c = Class.forName(klass);
- } catch (ClassNotFoundException e) {
- throw new ConfigException(key, klass, "Class " + klass + " could not be found.");
- }
- if (c == null)
- return null;
- Object o = Utils.newInstance(c);
- if (!t.isInstance(o))
- throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
- if (o instanceof Configurable)
- ((Configurable) o).configure(this.originals);
- objects.add(t.cast(o));
- }
- return objects;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
deleted file mode 100644
index 1bbe891..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.config;
-
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This class is used for specifying the set of expected configurations, their type, their defaults, their
- * documentation, and any special validation logic used for checking the correctness of the values the user provides.
- * <p/>
- * Usage of this class looks something like this:
- * <p/>
- * <pre>
- * ConfigDef defs = new ConfigDef();
- * defs.define("config_name", Type.STRING, "default string value", "This configuration is used for blah blah blah.");
- * defs.define("another_config_name", Type.INT, 42, Range.atLeast(0), "More documentation on this config");
- *
- * Properties props = new Properties();
- * props.setProperty("config_name", "some value");
- * Map<String, Object> configs = defs.parse(props);
- *
- * String someConfig = (String) configs.get("config_name"); // will return "some value"
- * int anotherConfig = (Integer) configs.get("another_config_name"); // will return default value of 42
- * </pre>
- * <p/>
- * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
- * functionality for accessing configs.
- */
-public class ConfigDef {
-
- private static final Object NO_DEFAULT_VALUE = new String("");
-
- private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
-
- /**
- * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
- *
- * @return new unmodifiable {@link Set} instance containing the keys
- */
- public Set<String> names() {
- return Collections.unmodifiableSet(configKeys.keySet());
- }
-
- /**
- * Define a new configuration
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param defaultValue The default value to use if this config isn't present
- * @param validator A validator to use in checking the correctness of the config
- * @param importance The importance of this config: is this something you will likely need to change.
- * @param documentation The documentation string for the config
- * @param required Should the config fail if given property is not set and doesn't have default value specified
- * @return This ConfigDef so you can chain calls
- */
- public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
- boolean required) {
- if (configKeys.containsKey(name))
- throw new ConfigException("Configuration " + name + " is defined twice.");
- Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
- configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, required));
- return this;
- }
-
- /**
- * Define a new required configuration
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param defaultValue The default value to use if this config isn't present
- * @param validator A validator to use in checking the correctness of the config
- * @param importance The importance of this config: is this something you will likely need to change.
- * @param documentation The documentation string for the config
- * @return This ConfigDef so you can chain calls
- */
- public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
- return define(name, type, defaultValue, validator, importance, documentation, true);
- }
-
- /**
- * Define a new configuration with no special validation logic
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param defaultValue The default value to use if this config isn't present
- * @param importance The importance of this config: is this something you will likely need to change.
- * @param documentation The documentation string for the config
- * @return This ConfigDef so you can chain calls
- */
- public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
- return define(name, type, defaultValue, null, importance, documentation, true);
- }
-
- /**
- * Define a required parameter with no default value
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param validator A validator to use in checking the correctness of the config
- * @param importance The importance of this config: is this something you will likely need to change.
- * @param documentation The documentation string for the config
- * @return This ConfigDef so you can chain calls
- */
- public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
- return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation, true);
- }
-
- /**
- * Define a required parameter with no default value and no special validation logic
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param importance The importance of this config: is this something you will likely need to change.
- * @param documentation The documentation string for the config
- * @return This ConfigDef so you can chain calls
- */
- public ConfigDef define(String name, Type type, Importance importance, String documentation) {
- return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, true);
- }
-
- /**
- * Define a required parameter with no default value and no special validation logic
- *
- * @param name The name of the config parameter
- * @param type The type of the config
- * @param importance The importance of this config: is this something you will likely need to change.
- * @param documentation The documentation string for the config
- * @param required Should the config fail if given property is not set and doesn't have default value specified
- * @return This ConfigDef so you can chain calls
- */
- public ConfigDef define(String name, Type type, Importance importance, String documentation, boolean required) {
- return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
- }
-
-
- /**
- * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
- * that the keys of the map are strings, but the values can either be strings or they may already be of the
- * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
- * programmatically constructed map.
- *
- * @param props The configs to parse and validate
- * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
- * the appropriate type (int, string, etc)
- */
- public Map<String, Object> parse(Map<?, ?> props) {
- /* parse all known keys */
- Map<String, Object> values = new HashMap<String, Object>();
- for (ConfigKey key : configKeys.values()) {
- Object value;
- // props map contains setting - assign ConfigKey value
- if (props.containsKey(key.name))
- value = parseType(key.name, props.get(key.name), key.type);
- // props map doesn't contain setting, the key is required and no default value specified - it's an error
- else if (key.defaultValue == NO_DEFAULT_VALUE && key.required)
- throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
- // props map doesn't contain setting, no default value specified and the key is not required - assign it to null
- else if (!key.hasDefault() && !key.required)
- value = null;
- // otherwise assign setting it's default value
- else
- value = key.defaultValue;
- if (key.validator != null)
- key.validator.ensureValid(key.name, value);
- values.put(key.name, value);
- }
- return values;
- }
-
- /**
- * Parse a value according to its expected type.
- *
- * @param name The config name
- * @param value The config value
- * @param type The expected type
- * @return The parsed object
- */
- private Object parseType(String name, Object value, Type type) {
- try {
- String trimmed = null;
- if (value instanceof String)
- trimmed = ((String) value).trim();
- switch (type) {
- case BOOLEAN:
- if (value instanceof String) {
- if (trimmed.equalsIgnoreCase("true"))
- return true;
- else if (trimmed.equalsIgnoreCase("false"))
- return false;
- else
- throw new ConfigException(name, value, "Expected value to be either true or false");
- } else if (value instanceof Boolean)
- return value;
- else
- throw new ConfigException(name, value, "Expected value to be either true or false");
- case STRING:
- if (value instanceof String)
- return trimmed;
- else
- throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
- case INT:
- if (value instanceof Integer) {
- return (Integer) value;
- } else if (value instanceof String) {
- return Integer.parseInt(trimmed);
- } else {
- throw new ConfigException(name, value, "Expected value to be an number.");
- }
- case SHORT:
- if (value instanceof Short) {
- return (Short) value;
- } else if (value instanceof String) {
- return Short.parseShort(trimmed);
- } else {
- throw new ConfigException(name, value, "Expected value to be an number.");
- }
- case LONG:
- if (value instanceof Integer)
- return ((Integer) value).longValue();
- if (value instanceof Long)
- return (Long) value;
- else if (value instanceof String)
- return Long.parseLong(trimmed);
- else
- throw new ConfigException(name, value, "Expected value to be an number.");
- case DOUBLE:
- if (value instanceof Number)
- return ((Number) value).doubleValue();
- else if (value instanceof String)
- return Double.parseDouble(trimmed);
- else
- throw new ConfigException(name, value, "Expected value to be an number.");
- case LIST:
- if (value instanceof List)
- return (List<?>) value;
- else if (value instanceof String)
- if (trimmed.isEmpty())
- return Collections.emptyList();
- else
- return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
- else
- throw new ConfigException(name, value, "Expected a comma separated list.");
- case CLASS:
- if (value instanceof Class)
- return (Class<?>) value;
- else if (value instanceof String)
- return Class.forName(trimmed);
- else
- throw new ConfigException(name, value, "Expected a Class instance or class name.");
- default:
- throw new IllegalStateException("Unknown type.");
- }
- } catch (NumberFormatException e) {
- throw new ConfigException(name, value, "Not a number of type " + type);
- } catch (ClassNotFoundException e) {
- throw new ConfigException(name, value, "Class " + value + " could not be found.");
- }
- }
-
- /**
- * The config types
- */
- public enum Type {
- BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS;
- }
-
- public enum Importance {
- HIGH, MEDIUM, LOW
- }
-
- /**
- * Validation logic the user may provide
- */
- public interface Validator {
- public void ensureValid(String name, Object o);
- }
-
- /**
- * Validation logic for numeric ranges
- */
- public static class Range implements Validator {
- private final Number min;
- private final Number max;
-
- private Range(Number min, Number max) {
- this.min = min;
- this.max = max;
- }
-
- /**
- * A numeric range that checks only the lower bound
- *
- * @param min The minimum acceptable value
- */
- public static Range atLeast(Number min) {
- return new Range(min, null);
- }
-
- /**
- * A numeric range that checks both the upper and lower bound
- */
- public static Range between(Number min, Number max) {
- return new Range(min, max);
- }
-
- public void ensureValid(String name, Object o) {
- Number n = (Number) o;
- if (min != null && n.doubleValue() < min.doubleValue())
- throw new ConfigException(name, o, "Value must be at least " + min);
- if (max != null && n.doubleValue() > max.doubleValue())
- throw new ConfigException(name, o, "Value must be no more than " + max);
- }
-
- public String toString() {
- if (min == null)
- return "[...," + max + "]";
- else if (max == null)
- return "[" + min + ",...]";
- else
- return "[" + min + ",...," + max + "]";
- }
- }
-
- public static class ValidString implements Validator {
- List<String> validStrings;
-
- private ValidString(List<String> validStrings) {
- this.validStrings = validStrings;
- }
-
- public static ValidString in(String... validStrings) {
- return new ValidString(Arrays.asList(validStrings));
- }
-
- @Override
- public void ensureValid(String name, Object o) {
- String s = (String) o;
- if (!validStrings.contains(s)) {
- throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
- }
-
- }
-
- public String toString() {
- return "[" + Utils.join(validStrings, ", ") + "]";
- }
- }
-
- private static class ConfigKey {
- public final String name;
- public final Type type;
- public final String documentation;
- public final Object defaultValue;
- public final Validator validator;
- public final Importance importance;
- public final boolean required;
-
- public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required) {
- super();
- this.name = name;
- this.type = type;
- this.defaultValue = defaultValue;
- this.validator = validator;
- this.importance = importance;
- if (this.validator != null)
- this.validator.ensureValid(name, defaultValue);
- this.documentation = documentation;
- this.required = required;
- }
-
- public boolean hasDefault() {
- return this.defaultValue != NO_DEFAULT_VALUE;
- }
-
- }
-
- public String toHtmlTable() {
- // sort first required fields, then by importance, then name
- List<ConfigKey> configs = new ArrayList<ConfigKey>(this.configKeys.values());
- Collections.sort(configs, new Comparator<ConfigKey>() {
- public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) {
- // first take anything with no default value
- if (!k1.hasDefault() && k2.hasDefault())
- return -1;
- else if (!k2.hasDefault() && k1.hasDefault())
- return 1;
-
- // then sort by importance
- int cmp = k1.importance.compareTo(k2.importance);
- if (cmp == 0)
- // then sort in alphabetical order
- return k1.name.compareTo(k2.name);
- else
- return cmp;
- }
- });
- StringBuilder b = new StringBuilder();
- b.append("<table>\n");
- b.append("<tr>\n");
- b.append("<th>Name</th>\n");
- b.append("<th>Type</th>\n");
- b.append("<th>Default</th>\n");
- b.append("<th>Importance</th>\n");
- b.append("<th>Description</th>\n");
- b.append("</tr>\n");
- for (ConfigKey def : configs) {
- b.append("<tr>\n");
- b.append("<td>");
- b.append(def.name);
- b.append("</td>");
- b.append("<td>");
- b.append(def.type.toString().toLowerCase());
- b.append("</td>");
- b.append("<td>");
- b.append(def.defaultValue == null ? "" : def.defaultValue);
- b.append("</td>");
- b.append("<td>");
- b.append(def.importance.toString().toLowerCase());
- b.append("</td>");
- b.append("<td>");
- b.append(def.documentation);
- b.append("</td>");
- b.append("</tr>\n");
- }
- b.append("</table>");
- return b.toString();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
deleted file mode 100644
index 13b9410..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.config;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Thrown if the user supplies an invalid configuration
- */
-public class ConfigException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public ConfigException(String message) {
- super(message);
- }
-
- public ConfigException(String name, Object value) {
- this(name, value, null);
- }
-
- public ConfigException(String name, Object value, String message) {
- super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
deleted file mode 100644
index 1e6f7ec..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.errors;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Any API exception that is part of the public protocol and should be a subclass of this class and be part of this
- * package.
- */
-public class ApiException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public ApiException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ApiException(String message) {
- super(message);
- }
-
- public ApiException(Throwable cause) {
- super(cause);
- }
-
- public ApiException() {
- super();
- }
-
- /* avoid the expensive and useless stack trace for api exceptions */
- @Override
- public Throwable fillInStackTrace() {
- return this;
- }
-
-}