You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:41 UTC

[24/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'

[FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33f4c818
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33f4c818
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33f4c818

Branch: refs/heads/master
Commit: 33f4c818dd81d259f5b6c06f5caeda0376c40750
Parents: fb5aac2
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 11 16:48:26 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 12:40:38 2015 +0200

----------------------------------------------------------------------
 .../kafka_backport/clients/ClientRequest.java   |   75 ++
 .../kafka_backport/clients/ClientResponse.java  |   87 ++
 .../kafka_backport/clients/ClientUtils.java     |   71 ++
 .../clients/ClusterConnectionStates.java        |  170 +++
 .../clients/CommonClientConfigs.java            |   69 ++
 .../kafka_backport/clients/ConnectionState.java |   29 +
 .../clients/InFlightRequests.java               |  135 +++
 .../kafka_backport/clients/KafkaClient.java     |  143 +++
 .../flink/kafka_backport/clients/Metadata.java  |  195 +++
 .../kafka_backport/clients/NetworkClient.java   |  528 ++++++++
 .../clients/RequestCompletionHandler.java       |   32 +
 .../clients/consumer/CommitType.java            |   26 +
 .../clients/consumer/Consumer.java              |  135 +++
 .../consumer/ConsumerCommitCallback.java        |   42 +
 .../clients/consumer/ConsumerConfig.java        |  334 ++++++
 .../consumer/ConsumerRebalanceCallback.java     |  104 ++
 .../clients/consumer/ConsumerRecord.java        |   93 ++
 .../clients/consumer/ConsumerRecords.java       |  126 ++
 .../consumer/ConsumerWakeupException.java       |   29 +
 .../clients/consumer/KafkaConsumer.java         | 1130 ++++++++++++++++++
 .../clients/consumer/MockConsumer.java          |  209 ++++
 .../consumer/NoOffsetForPartitionException.java |   38 +
 .../clients/consumer/OffsetResetStrategy.java   |   26 +
 .../internals/ConsumerNetworkClient.java        |  296 +++++
 .../clients/consumer/internals/Coordinator.java |  791 ++++++++++++
 .../clients/consumer/internals/DelayedTask.java |   24 +
 .../consumer/internals/DelayedTaskQueue.java    |   96 ++
 .../clients/consumer/internals/Fetcher.java     |  506 ++++++++
 .../clients/consumer/internals/Heartbeat.java   |   83 ++
 .../internals/NoAvailableBrokersException.java  |   32 +
 .../NoOpConsumerRebalanceCallback.java          |   39 +
 .../consumer/internals/RequestFuture.java       |  211 ++++
 .../internals/RequestFutureAdapter.java         |   37 +
 .../internals/RequestFutureListener.java        |   32 +
 .../consumer/internals/SendFailedException.java |   36 +
 .../internals/StaleMetadataException.java       |   31 +
 .../consumer/internals/SubscriptionState.java   |  242 ++++
 .../flink/kafka_backport/common/Cluster.java    |  203 ++++
 .../kafka_backport/common/Configurable.java     |   40 +
 .../kafka_backport/common/KafkaException.java   |   51 +
 .../flink/kafka_backport/common/Metric.java     |   43 +
 .../flink/kafka_backport/common/MetricName.java |  194 +++
 .../flink/kafka_backport/common/Node.java       |  113 ++
 .../kafka_backport/common/PartitionInfo.java    |  104 ++
 .../kafka_backport/common/TopicPartition.java   |   89 ++
 .../common/config/AbstractConfig.java           |  185 +++
 .../kafka_backport/common/config/ConfigDef.java |  456 +++++++
 .../common/config/ConfigException.java          |   49 +
 .../common/errors/ApiException.java             |   60 +
 ...onsumerCoordinatorNotAvailableException.java |   49 +
 .../common/errors/CorruptRecordException.java   |   48 +
 .../common/errors/DisconnectException.java      |   47 +
 .../errors/IllegalGenerationException.java      |   42 +
 .../common/errors/InterruptException.java       |   48 +
 .../common/errors/InvalidMetadataException.java |   48 +
 .../errors/InvalidRequiredAcksException.java    |   34 +
 .../common/errors/InvalidTopicException.java    |   47 +
 .../errors/LeaderNotAvailableException.java     |   36 +
 .../common/errors/NetworkException.java         |   48 +
 .../NotCoordinatorForConsumerException.java     |   49 +
 .../NotEnoughReplicasAfterAppendException.java  |   39 +
 .../errors/NotEnoughReplicasException.java      |   49 +
 .../errors/NotLeaderForPartitionException.java  |   47 +
 .../errors/OffsetLoadInProgressException.java   |   49 +
 .../common/errors/OffsetMetadataTooLarge.java   |   46 +
 .../errors/OffsetOutOfRangeException.java       |   47 +
 .../errors/RecordBatchTooLargeException.java    |   48 +
 .../common/errors/RecordTooLargeException.java  |   47 +
 .../common/errors/RetriableException.java       |   46 +
 .../common/errors/SerializationException.java   |   55 +
 .../common/errors/TimeoutException.java         |   47 +
 .../errors/UnknownConsumerIdException.java      |   42 +
 .../common/errors/UnknownServerException.java   |   48 +
 .../UnknownTopicOrPartitionException.java       |   46 +
 .../common/metrics/CompoundStat.java            |   61 +
 .../common/metrics/JmxReporter.java             |  225 ++++
 .../common/metrics/KafkaMetric.java             |   74 ++
 .../common/metrics/Measurable.java              |   37 +
 .../common/metrics/MeasurableStat.java          |   38 +
 .../common/metrics/MetricConfig.java            |   96 ++
 .../kafka_backport/common/metrics/Metrics.java  |  211 ++++
 .../common/metrics/MetricsReporter.java         |   50 +
 .../kafka_backport/common/metrics/Quota.java    |   61 +
 .../common/metrics/QuotaViolationException.java |   41 +
 .../kafka_backport/common/metrics/Sensor.java   |  182 +++
 .../kafka_backport/common/metrics/Stat.java     |   41 +
 .../common/metrics/stats/Avg.java               |   54 +
 .../common/metrics/stats/Count.java             |   50 +
 .../common/metrics/stats/Histogram.java         |  166 +++
 .../common/metrics/stats/Max.java               |   50 +
 .../common/metrics/stats/Min.java               |   50 +
 .../common/metrics/stats/Percentile.java        |   49 +
 .../common/metrics/stats/Percentiles.java       |  125 ++
 .../common/metrics/stats/Rate.java              |  115 ++
 .../common/metrics/stats/SampledStat.java       |  139 +++
 .../common/metrics/stats/Total.java             |   52 +
 .../common/network/ByteBufferReceive.java       |   66 +
 .../common/network/ByteBufferSend.java          |   71 ++
 .../common/network/InvalidReceiveException.java |   39 +
 .../common/network/MultiSend.java               |  108 ++
 .../common/network/NetworkReceive.java          |  128 ++
 .../common/network/NetworkSend.java             |   51 +
 .../kafka_backport/common/network/Receive.java  |   54 +
 .../common/network/Selectable.java              |  115 ++
 .../kafka_backport/common/network/Selector.java |  664 ++++++++++
 .../kafka_backport/common/network/Send.java     |   56 +
 .../kafka_backport/common/protocol/ApiKeys.java |   75 ++
 .../kafka_backport/common/protocol/Errors.java  |  172 +++
 .../common/protocol/ProtoUtils.java             |   74 ++
 .../common/protocol/Protocol.java               |  474 ++++++++
 .../common/protocol/SecurityProtocol.java       |   72 ++
 .../common/protocol/types/ArrayOf.java          |   88 ++
 .../common/protocol/types/Field.java            |   78 ++
 .../common/protocol/types/Schema.java           |  168 +++
 .../common/protocol/types/SchemaException.java  |   41 +
 .../common/protocol/types/Struct.java           |  338 ++++++
 .../common/protocol/types/Type.java             |  259 ++++
 .../common/record/ByteBufferInputStream.java    |   58 +
 .../common/record/ByteBufferOutputStream.java   |   66 +
 .../common/record/CompressionType.java          |   72 ++
 .../common/record/Compressor.java               |  279 +++++
 .../common/record/InvalidRecordException.java   |   36 +
 .../common/record/KafkaLZ4BlockInputStream.java |  236 ++++
 .../record/KafkaLZ4BlockOutputStream.java       |  400 +++++++
 .../kafka_backport/common/record/LogEntry.java  |   57 +
 .../common/record/MemoryRecords.java            |  280 +++++
 .../kafka_backport/common/record/Record.java    |  352 ++++++
 .../kafka_backport/common/record/Records.java   |   54 +
 .../common/requests/AbstractRequest.java        |   71 ++
 .../requests/AbstractRequestResponse.java       |   75 ++
 .../requests/ConsumerMetadataRequest.java       |   74 ++
 .../requests/ConsumerMetadataResponse.java      |   79 ++
 .../common/requests/FetchRequest.java           |  174 +++
 .../common/requests/FetchResponse.java          |  134 +++
 .../common/requests/HeartbeatRequest.java       |   90 ++
 .../common/requests/HeartbeatResponse.java      |   64 +
 .../common/requests/JoinGroupRequest.java       |  121 ++
 .../common/requests/JoinGroupResponse.java      |  122 ++
 .../common/requests/ListOffsetRequest.java      |  151 +++
 .../common/requests/ListOffsetResponse.java     |  127 ++
 .../common/requests/MetadataRequest.java        |   89 ++
 .../common/requests/MetadataResponse.java       |  186 +++
 .../common/requests/OffsetCommitRequest.java    |  275 +++++
 .../common/requests/OffsetCommitResponse.java   |  109 ++
 .../common/requests/OffsetFetchRequest.java     |  132 ++
 .../common/requests/OffsetFetchResponse.java    |  135 +++
 .../common/requests/ProduceRequest.java         |  141 +++
 .../common/requests/ProduceResponse.java        |  131 ++
 .../common/requests/RequestHeader.java          |   89 ++
 .../common/requests/RequestSend.java            |   64 +
 .../common/requests/ResponseHeader.java         |   62 +
 .../common/requests/ResponseSend.java           |   50 +
 .../serialization/ByteArrayDeserializer.java    |   43 +
 .../serialization/ByteArraySerializer.java      |   43 +
 .../common/serialization/Deserializer.java      |   53 +
 .../serialization/IntegerDeserializer.java      |   53 +
 .../common/serialization/IntegerSerializer.java |   47 +
 .../common/serialization/Serializer.java        |   58 +
 .../serialization/StringDeserializer.java       |   62 +
 .../common/serialization/StringSerializer.java  |   62 +
 .../common/utils/AbstractIterator.java          |   97 ++
 .../common/utils/CollectionUtils.java           |   71 ++
 .../common/utils/CopyOnWriteMap.java            |  151 +++
 .../kafka_backport/common/utils/Crc32.java      |  396 ++++++
 .../common/utils/KafkaThread.java               |   44 +
 .../kafka_backport/common/utils/SystemTime.java |   52 +
 .../flink/kafka_backport/common/utils/Time.java |   48 +
 .../kafka_backport/common/utils/Utils.java      |  506 ++++++++
 .../flink/kafka_backport/package-info.java      |   10 +
 .../streaming/connectors/internals/Fetcher.java |    2 +-
 .../internals/FlinkKafkaConsumerBase.java       |   10 +-
 .../connectors/internals/IncludedFetcher.java   |   12 +-
 .../connectors/internals/LegacyFetcher.java     |    6 +-
 .../kafka/copied/clients/ClientRequest.java     |   66 -
 .../kafka/copied/clients/ClientResponse.java    |   78 --
 .../kafka/copied/clients/ClientUtils.java       |   64 -
 .../copied/clients/ClusterConnectionStates.java |  161 ---
 .../copied/clients/CommonClientConfigs.java     |   60 -
 .../kafka/copied/clients/ConnectionState.java   |   20 -
 .../kafka/copied/clients/InFlightRequests.java  |  126 --
 .../kafka/copied/clients/KafkaClient.java       |  134 ---
 .../apache/kafka/copied/clients/Metadata.java   |  186 ---
 .../kafka/copied/clients/NetworkClient.java     |  519 --------
 .../clients/RequestCompletionHandler.java       |   23 -
 .../copied/clients/consumer/CommitType.java     |   17 -
 .../kafka/copied/clients/consumer/Consumer.java |  126 --
 .../consumer/ConsumerCommitCallback.java        |   33 -
 .../copied/clients/consumer/ConsumerConfig.java |  325 -----
 .../consumer/ConsumerRebalanceCallback.java     |   95 --
 .../copied/clients/consumer/ConsumerRecord.java |   84 --
 .../clients/consumer/ConsumerRecords.java       |  117 --
 .../consumer/ConsumerWakeupException.java       |   20 -
 .../copied/clients/consumer/KafkaConsumer.java  | 1121 -----------------
 .../copied/clients/consumer/MockConsumer.java   |  200 ----
 .../consumer/NoOffsetForPartitionException.java |   29 -
 .../clients/consumer/OffsetResetStrategy.java   |   17 -
 .../internals/ConsumerNetworkClient.java        |  296 -----
 .../clients/consumer/internals/Coordinator.java |  791 ------------
 .../clients/consumer/internals/DelayedTask.java |   24 -
 .../consumer/internals/DelayedTaskQueue.java    |   96 --
 .../clients/consumer/internals/Fetcher.java     |  498 --------
 .../clients/consumer/internals/Heartbeat.java   |   74 --
 .../internals/NoAvailableBrokersException.java  |   23 -
 .../NoOpConsumerRebalanceCallback.java          |   30 -
 .../consumer/internals/RequestFuture.java       |  202 ----
 .../internals/RequestFutureAdapter.java         |   28 -
 .../internals/RequestFutureListener.java        |   23 -
 .../consumer/internals/SendFailedException.java |   27 -
 .../internals/StaleMetadataException.java       |   22 -
 .../consumer/internals/SubscriptionState.java   |  233 ----
 .../org/apache/kafka/copied/common/Cluster.java |  194 ---
 .../kafka/copied/common/Configurable.java       |   31 -
 .../kafka/copied/common/KafkaException.java     |   42 -
 .../org/apache/kafka/copied/common/Metric.java  |   34 -
 .../apache/kafka/copied/common/MetricName.java  |  185 ---
 .../org/apache/kafka/copied/common/Node.java    |  104 --
 .../kafka/copied/common/PartitionInfo.java      |   95 --
 .../kafka/copied/common/TopicPartition.java     |   79 --
 .../copied/common/config/AbstractConfig.java    |  176 ---
 .../kafka/copied/common/config/ConfigDef.java   |  447 -------
 .../copied/common/config/ConfigException.java   |   40 -
 .../copied/common/errors/ApiException.java      |   51 -
 ...onsumerCoordinatorNotAvailableException.java |   40 -
 .../common/errors/CorruptRecordException.java   |   39 -
 .../common/errors/DisconnectException.java      |   39 -
 .../errors/IllegalGenerationException.java      |   33 -
 .../common/errors/InterruptException.java       |   39 -
 .../common/errors/InvalidMetadataException.java |   39 -
 .../errors/InvalidRequiredAcksException.java    |   25 -
 .../common/errors/InvalidTopicException.java    |   38 -
 .../errors/LeaderNotAvailableException.java     |   27 -
 .../copied/common/errors/NetworkException.java  |   39 -
 .../NotCoordinatorForConsumerException.java     |   40 -
 .../NotEnoughReplicasAfterAppendException.java  |   30 -
 .../errors/NotEnoughReplicasException.java      |   40 -
 .../errors/NotLeaderForPartitionException.java  |   38 -
 .../errors/OffsetLoadInProgressException.java   |   40 -
 .../common/errors/OffsetMetadataTooLarge.java   |   37 -
 .../errors/OffsetOutOfRangeException.java       |   38 -
 .../errors/RecordBatchTooLargeException.java    |   39 -
 .../common/errors/RecordTooLargeException.java  |   38 -
 .../common/errors/RetriableException.java       |   37 -
 .../common/errors/SerializationException.java   |   46 -
 .../copied/common/errors/TimeoutException.java  |   38 -
 .../errors/UnknownConsumerIdException.java      |   33 -
 .../common/errors/UnknownServerException.java   |   39 -
 .../UnknownTopicOrPartitionException.java       |   37 -
 .../copied/common/metrics/CompoundStat.java     |   52 -
 .../copied/common/metrics/JmxReporter.java      |  216 ----
 .../copied/common/metrics/KafkaMetric.java      |   65 -
 .../kafka/copied/common/metrics/Measurable.java |   28 -
 .../copied/common/metrics/MeasurableStat.java   |   30 -
 .../copied/common/metrics/MetricConfig.java     |   87 --
 .../kafka/copied/common/metrics/Metrics.java    |  202 ----
 .../copied/common/metrics/MetricsReporter.java  |   41 -
 .../kafka/copied/common/metrics/Quota.java      |   52 -
 .../common/metrics/QuotaViolationException.java |   32 -
 .../kafka/copied/common/metrics/Sensor.java     |  173 ---
 .../kafka/copied/common/metrics/Stat.java       |   32 -
 .../kafka/copied/common/metrics/stats/Avg.java  |   45 -
 .../copied/common/metrics/stats/Count.java      |   41 -
 .../copied/common/metrics/stats/Histogram.java  |  157 ---
 .../kafka/copied/common/metrics/stats/Max.java  |   41 -
 .../kafka/copied/common/metrics/stats/Min.java  |   41 -
 .../copied/common/metrics/stats/Percentile.java |   40 -
 .../common/metrics/stats/Percentiles.java       |  116 --
 .../kafka/copied/common/metrics/stats/Rate.java |  106 --
 .../common/metrics/stats/SampledStat.java       |  130 --
 .../copied/common/metrics/stats/Total.java      |   43 -
 .../common/network/ByteBufferReceive.java       |   57 -
 .../copied/common/network/ByteBufferSend.java   |   62 -
 .../common/network/InvalidReceiveException.java |   30 -
 .../kafka/copied/common/network/MultiSend.java  |  100 --
 .../copied/common/network/NetworkReceive.java   |  119 --
 .../copied/common/network/NetworkSend.java      |   42 -
 .../kafka/copied/common/network/Receive.java    |   45 -
 .../kafka/copied/common/network/Selectable.java |  106 --
 .../kafka/copied/common/network/Selector.java   |  655 ----------
 .../kafka/copied/common/network/Send.java       |   47 -
 .../kafka/copied/common/protocol/ApiKeys.java   |   66 -
 .../kafka/copied/common/protocol/Errors.java    |  163 ---
 .../copied/common/protocol/ProtoUtils.java      |   65 -
 .../kafka/copied/common/protocol/Protocol.java  |  470 --------
 .../common/protocol/SecurityProtocol.java       |   63 -
 .../copied/common/protocol/types/ArrayOf.java   |   79 --
 .../copied/common/protocol/types/Field.java     |   69 --
 .../copied/common/protocol/types/Schema.java    |  159 ---
 .../common/protocol/types/SchemaException.java  |   32 -
 .../copied/common/protocol/types/Struct.java    |  329 -----
 .../copied/common/protocol/types/Type.java      |  250 ----
 .../common/record/ByteBufferInputStream.java    |   49 -
 .../common/record/ByteBufferOutputStream.java   |   57 -
 .../copied/common/record/CompressionType.java   |   63 -
 .../kafka/copied/common/record/Compressor.java  |  270 -----
 .../common/record/InvalidRecordException.java   |   27 -
 .../common/record/KafkaLZ4BlockInputStream.java |  233 ----
 .../record/KafkaLZ4BlockOutputStream.java       |  391 ------
 .../kafka/copied/common/record/LogEntry.java    |   48 -
 .../copied/common/record/MemoryRecords.java     |  271 -----
 .../kafka/copied/common/record/Record.java      |  344 ------
 .../kafka/copied/common/record/Records.java     |   45 -
 .../copied/common/requests/AbstractRequest.java |   62 -
 .../requests/AbstractRequestResponse.java       |   66 -
 .../requests/ConsumerMetadataRequest.java       |   65 -
 .../requests/ConsumerMetadataResponse.java      |   70 --
 .../copied/common/requests/FetchRequest.java    |  165 ---
 .../copied/common/requests/FetchResponse.java   |  125 --
 .../common/requests/HeartbeatRequest.java       |   81 --
 .../common/requests/HeartbeatResponse.java      |   55 -
 .../common/requests/JoinGroupRequest.java       |  112 --
 .../common/requests/JoinGroupResponse.java      |  113 --
 .../common/requests/ListOffsetRequest.java      |  142 ---
 .../common/requests/ListOffsetResponse.java     |  118 --
 .../copied/common/requests/MetadataRequest.java |   80 --
 .../common/requests/MetadataResponse.java       |  177 ---
 .../common/requests/OffsetCommitRequest.java    |  266 -----
 .../common/requests/OffsetCommitResponse.java   |  100 --
 .../common/requests/OffsetFetchRequest.java     |  123 --
 .../common/requests/OffsetFetchResponse.java    |  126 --
 .../copied/common/requests/ProduceRequest.java  |  132 --
 .../copied/common/requests/ProduceResponse.java |  122 --
 .../copied/common/requests/RequestHeader.java   |   80 --
 .../copied/common/requests/RequestSend.java     |   55 -
 .../copied/common/requests/ResponseHeader.java  |   55 -
 .../copied/common/requests/ResponseSend.java    |   41 -
 .../serialization/ByteArrayDeserializer.java    |   34 -
 .../serialization/ByteArraySerializer.java      |   34 -
 .../common/serialization/Deserializer.java      |   44 -
 .../serialization/IntegerDeserializer.java      |   44 -
 .../common/serialization/IntegerSerializer.java |   38 -
 .../copied/common/serialization/Serializer.java |   49 -
 .../serialization/StringDeserializer.java       |   53 -
 .../common/serialization/StringSerializer.java  |   53 -
 .../copied/common/utils/AbstractIterator.java   |   88 --
 .../copied/common/utils/CollectionUtils.java    |   62 -
 .../copied/common/utils/CopyOnWriteMap.java     |  142 ---
 .../apache/kafka/copied/common/utils/Crc32.java |  387 ------
 .../kafka/copied/common/utils/KafkaThread.java  |   35 -
 .../kafka/copied/common/utils/SystemTime.java   |   43 -
 .../apache/kafka/copied/common/utils/Time.java  |   39 -
 .../apache/kafka/copied/common/utils/Utils.java |  497 --------
 tools/maven/suppressions.xml                    |    4 +-
 342 files changed, 20627 insertions(+), 19160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java
new file mode 100644
index 0000000..d86ea96
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.requests.RequestSend;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A request being sent to the server. This holds both the network send as well as the client-level metadata.
+ */
+public final class ClientRequest {
+
+    private final long createdMs;
+    private final boolean expectResponse;
+    private final RequestSend request;
+    private final RequestCompletionHandler callback;
+
+    /**
+     * @param createdMs The unix timestamp in milliseconds for the time at which this request was created.
+     * @param expectResponse Should we expect a response message or is this request complete once it is sent?
+     * @param request The request
+     * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
+     */
+    public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler callback) {
+        this.createdMs = createdMs;
+        this.callback = callback;
+        this.request = request;
+        this.expectResponse = expectResponse;
+    }
+
+    @Override
+    public String toString() {
+        return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + ", request=" + request
+                + ")";
+    }
+
+    public boolean expectResponse() {
+        return expectResponse;
+    }
+
+    public RequestSend request() {
+        return request;
+    }
+
+    public boolean hasCallback() {
+        return callback != null;
+    }
+
+    public RequestCompletionHandler callback() {
+        return callback;
+    }
+
+    public long createdTime() {
+        return createdMs;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java
new file mode 100644
index 0000000..49a7540
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A response from the server. Contains both the body of the response as well as the correlated request that was
+ * originally sent.
+ */
+public class ClientResponse {
+
+    private final long received;
+    private final boolean disconnected;
+    private final ClientRequest request;
+    private final Struct responseBody;
+
+    /**
+     * @param request The original request
+     * @param received The unix timestamp when this response was received
+     * @param disconnected Whether the client disconnected before fully reading a response
+     * @param responseBody The response contents (or null) if we disconnected or no response was expected
+     */
+    public ClientResponse(ClientRequest request, long received, boolean disconnected, Struct responseBody) {
+        super();
+        this.received = received;
+        this.disconnected = disconnected;
+        this.request = request;
+        this.responseBody = responseBody;
+    }
+
+    public long receivedTime() {
+        return received;
+    }
+
+    public boolean wasDisconnected() {
+        return disconnected;
+    }
+
+    public ClientRequest request() {
+        return request;
+    }
+
+    public Struct responseBody() {
+        return responseBody;
+    }
+
+    public boolean hasResponse() {
+        return responseBody != null;
+    }
+
+    public long requestLatencyMs() {
+        return receivedTime() - this.request.createdTime();
+    }
+
+    @Override
+    public String toString() {
+        return "ClientResponse(received=" + received +
+               ", disconnected=" +
+               disconnected +
+               ", request=" +
+               request +
+               ", responseBody=" +
+               responseBody +
+               ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java
new file mode 100644
index 0000000..1a00a78
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.config.ConfigException;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ClientUtils {
+    private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
+
+    public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+        List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+        for (String url : urls) {
+            if (url != null && url.length() > 0) {
+                String host = Utils.getHost(url);
+                Integer port = Utils.getPort(url);
+                if (host == null || port == null)
+                    throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                try {
+                    InetSocketAddress address = new InetSocketAddress(host, port);
+                    if (address.isUnresolved())
+                        throw new ConfigException("DNS resolution failed for url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                    addresses.add(address);
+                } catch (NumberFormatException e) {
+                    throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                }
+            }
+        }
+        if (addresses.size() < 1)
+            throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        return addresses;
+    }
+
+    public static void closeQuietly(Closeable c, String name, AtomicReference<Throwable> firstException) {
+        if (c != null) {
+            try {
+                c.close();
+            } catch (Throwable t) {
+                firstException.compareAndSet(null, t);
+                log.error("Failed to close " + name, t);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java
new file mode 100644
index 0000000..395164b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The state of our connection to each node in the cluster.
+ * 
+ */
+final class ClusterConnectionStates {
+    private final long reconnectBackoffMs;
+    private final Map<String, NodeConnectionState> nodeState;
+
+    public ClusterConnectionStates(long reconnectBackoffMs) {
+        this.reconnectBackoffMs = reconnectBackoffMs;
+        this.nodeState = new HashMap<String, NodeConnectionState>();
+    }
+
+    /**
+     * Return true iff we can currently initiate a new connection. This will be the case if we are not
+     * connected and haven't been connected for at least the minimum reconnection backoff period.
+     * @param id The connection id to check
+     * @param now The current time in MS
+     * @return true if we can initiate a new connection
+     */
+    public boolean canConnect(String id, long now) {
+        NodeConnectionState state = nodeState.get(id);
+        if (state == null)
+            return true;
+        else
+            return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
+    }
+
+    /**
+     * Return true if we are disconnected from the given node and can't re-establish a connection yet
+     * @param id The connection to check
+     * @param now The current time in ms
+     */
+    public boolean isBlackedOut(String id, long now) {
+        NodeConnectionState state = nodeState.get(id);
+        if (state == null)
+            return false;
+        else
+            return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs < this.reconnectBackoffMs;
+    }
+
+    /**
+     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+     * connections.
+     * @param id The connection to check
+     * @param now The current time in ms
+     */
+    public long connectionDelay(String id, long now) {
+        NodeConnectionState state = nodeState.get(id);
+        if (state == null) return 0;
+        long timeWaited = now - state.lastConnectAttemptMs;
+        if (state.state == ConnectionState.DISCONNECTED) {
+            return Math.max(this.reconnectBackoffMs - timeWaited, 0);
+        } else {
+            // When connecting or connected, we should be able to delay indefinitely since other events (connection or
+            // data acked) will cause a wakeup once data can be sent.
+            return Long.MAX_VALUE;
+        }
+    }
+
+    /**
+     * Enter the connecting state for the given connection.
+     * @param id The id of the connection
+     * @param now The current time.
+     */
+    public void connecting(String id, long now) {
+        nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));
+    }
+
+    /**
+     * Return true iff a specific connection is connected
+     * @param id The id of the connection to check
+     */
+    public boolean isConnected(String id) {
+        NodeConnectionState state = nodeState.get(id);
+        return state != null && state.state == ConnectionState.CONNECTED;
+    }
+
+    /**
+     * Return true iff we are in the process of connecting
+     * @param id The id of the connection
+     */
+    public boolean isConnecting(String id) {
+        NodeConnectionState state = nodeState.get(id);
+        return state != null && state.state == ConnectionState.CONNECTING;
+    }
+
+    /**
+     * Enter the connected state for the given connection
+     * @param id The connection identifier
+     */
+    public void connected(String id) {
+        NodeConnectionState nodeState = nodeState(id);
+        nodeState.state = ConnectionState.CONNECTED;
+    }
+
+    /**
+     * Enter the disconnected state for the given node
+     * @param id The connection we have disconnected
+     */
+    public void disconnected(String id) {
+        NodeConnectionState nodeState = nodeState(id);
+        nodeState.state = ConnectionState.DISCONNECTED;
+    }
+    
+    /**
+     * Get the state of a given connection
+     * @param id The id of the connection
+     * @return The state of our connection
+     */
+    public ConnectionState connectionState(String id) {
+        return nodeState(id).state;
+    }
+    
+    /**
+     * Get the state of a given node
+     * @param id The connection to fetch the state for
+     */
+    private NodeConnectionState nodeState(String id) {
+        NodeConnectionState state = this.nodeState.get(id);
+        if (state == null)
+            throw new IllegalStateException("No entry found for connection " + id);
+        return state;
+    }
+    
+    /**
+     * The state of our connection to a node
+     */
+    private static class NodeConnectionState {
+
+        ConnectionState state;
+        long lastConnectAttemptMs;
+
+        public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
+            this.state = state;
+            this.lastConnectAttemptMs = lastConnectAttempt;
+        }
+
+        public String toString() {
+            return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java
new file mode 100644
index 0000000..01e9da2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.clients;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Some configurations shared by both producer and consumer
+ */
+public class CommonClientConfigs {
+    
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+    public static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping&mdash;this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
+                                                       + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
+                                                       + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+                                                       + "servers (you may want more than one, though, in case a server is down).";
+    
+    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
+    public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
+    
+    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
+    public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";
+
+    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
+    public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.";
+
+    public static final String CLIENT_ID_CONFIG = "client.id";
+    public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";
+
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+    public static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.";
+
+    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
+    public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.";
+    
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
+    public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics.";
+
+    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
+    public static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
+
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
+    public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
+    public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java
new file mode 100644
index 0000000..34fd7e2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The states of a node connection
+ */
+public enum ConnectionState {
+    DISCONNECTED, CONNECTING, CONNECTED
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java
new file mode 100644
index 0000000..9fc489b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The set of requests which have been sent or are being sent but haven't yet received a response
+ */
+final class InFlightRequests {
+
+    private final int maxInFlightRequestsPerConnection;
+    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
+
+    public InFlightRequests(int maxInFlightRequestsPerConnection) {
+        this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
+    }
+
+    /**
+     * Add the given request to the queue for the connection it was directed to
+     */
+    public void add(ClientRequest request) {
+        Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
+        if (reqs == null) {
+            reqs = new ArrayDeque<ClientRequest>();
+            this.requests.put(request.request().destination(), reqs);
+        }
+        reqs.addFirst(request);
+    }
+
+    /**
+     * Get the request queue for the given node
+     */
+    private Deque<ClientRequest> requestQueue(String node) {
+        Deque<ClientRequest> reqs = requests.get(node);
+        if (reqs == null || reqs.isEmpty())
+            throw new IllegalStateException("Response from server for which there are no in-flight requests.");
+        return reqs;
+    }
+
+    /**
+     * Get the oldest request (the one that that will be completed next) for the given node
+     */
+    public ClientRequest completeNext(String node) {
+        return requestQueue(node).pollLast();
+    }
+
+    /**
+     * Get the last request we sent to the given node (but don't remove it from the queue)
+     * @param node The node id
+     */
+    public ClientRequest lastSent(String node) {
+        return requestQueue(node).peekFirst();
+    }
+
+    /**
+     * Complete the last request that was sent to a particular node.
+     * @param node The node the request was sent to
+     * @return The request
+     */
+    public ClientRequest completeLastSent(String node) {
+        return requestQueue(node).pollFirst();
+    }
+
+    /**
+     * Can we send more requests to this node?
+     * 
+     * @param node Node in question
+     * @return true iff we have no requests still being sent to the given node
+     */
+    public boolean canSendMore(String node) {
+        Deque<ClientRequest> queue = requests.get(node);
+        return queue == null || queue.isEmpty() ||
+               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
+    }
+
+    /**
+     * Return the number of inflight requests directed at the given node
+     * @param node The node
+     * @return The request count.
+     */
+    public int inFlightRequestCount(String node) {
+        Deque<ClientRequest> queue = requests.get(node);
+        return queue == null ? 0 : queue.size();
+    }
+
+    /**
+     * Count all in-flight requests for all nodes
+     */
+    public int inFlightRequestCount() {
+        int total = 0;
+        for (Deque<ClientRequest> deque : this.requests.values())
+            total += deque.size();
+        return total;
+    }
+
+    /**
+     * Clear out all the in-flight requests for the given node and return them
+     * 
+     * @param node The node
+     * @return All the in-flight requests for that node that have been removed
+     */
+    public Iterable<ClientRequest> clearAll(String node) {
+        Deque<ClientRequest> reqs = requests.get(node);
+        if (reqs == null) {
+            return Collections.emptyList();
+        } else {
+            return requests.remove(node);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java
new file mode 100644
index 0000000..2c3d733
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.requests.RequestHeader;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+
+import java.io.Closeable;
+import java.util.List;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The interface for {@link NetworkClient}
+ */
+public interface KafkaClient extends Closeable {
+
+    /**
+     * Check if we are currently ready to send another request to the given node but don't attempt to connect if we
+     * aren't.
+     * 
+     * @param node The node to check
+     * @param now The current timestamp
+     */
+    public boolean isReady(Node node, long now);
+
+    /**
+     * Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a
+     * node will change only when poll is invoked.
+     * 
+     * @param node The node to connect to.
+     * @param now The current time
+     * @return true iff we are ready to immediately initiate the sending of another request to the given node.
+     */
+    public boolean ready(Node node, long now);
+
+    /**
+     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+     * connections.
+     * 
+     * @param node The node to check
+     * @param now The current timestamp
+     * @return The number of milliseconds to wait.
+     */
+    public long connectionDelay(Node node, long now);
+
+    /**
+     * Check if the connection of the node has failed, based on the connection state. Such connection failure are
+     * usually transient and can be resumed in the next {@link #ready(Node, long)} }
+     * call, but there are cases where transient failures needs to be caught and re-acted upon.
+     *
+     * @param node the node to check
+     * @return true iff the connection has failed and the node is disconnected
+     */
+    public boolean connectionFailed(Node node);
+
+    /**
+     * Queue up the given request for sending. Requests can only be sent on ready connections.
+     * 
+     * @param request The request
+     */
+    public void send(ClientRequest request);
+
+    /**
+     * Do actual reads and writes from sockets.
+     * 
+     * @param timeout The maximum amount of time to wait for responses in ms
+     * @param now The current time in ms
+     * @throws IllegalStateException If a request is sent to an unready node
+     */
+    public List<ClientResponse> poll(long timeout, long now);
+
+    /**
+     * Complete all in-flight requests for a given connection
+     * 
+     * @param id The connection to complete requests for
+     * @param now The current time in ms
+     * @return All requests that complete during this time period.
+     */
+    public List<ClientResponse> completeAll(String id, long now);
+
+    /**
+     * Complete all in-flight requests
+     * 
+     * @param now The current time in ms
+     * @return All requests that complete during this time period.
+     */
+    public List<ClientResponse> completeAll(long now);
+
+    /**
+     * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection,
+     * but will potentially choose a node for which we don't yet have a connection if all existing connections are in
+     * use.
+     * 
+     * @param now The current time in ms
+     * @return The node with the fewest in-flight requests.
+     */
+    public Node leastLoadedNode(long now);
+
+    /**
+     * The number of currently in-flight requests for which we have not yet returned a response
+     */
+    public int inFlightRequestCount();
+
+    /**
+     * Get the total in-flight requests for a particular node
+     * 
+     * @param nodeId The id of the node
+     */
+    public int inFlightRequestCount(String nodeId);
+
+    /**
+     * Generate a request header for the next request
+     * 
+     * @param key The API key of the request
+     */
+    public RequestHeader nextRequestHeader(ApiKeys key);
+
+    /**
+     * Wake up the client if it is currently blocked waiting for I/O
+     */
+    public void wakeup();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java
new file mode 100644
index 0000000..1d228f7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A class encapsulating some of the logic around metadata.
+ * <p>
+ * This class is shared by the client thread (for partitioning) and the background sender thread.
+ * 
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
+ * topic we don't have any metadata for it will trigger a metadata update.
+ */
+public final class Metadata {
+
+    private static final Logger log = LoggerFactory.getLogger(Metadata.class);
+
+    private final long refreshBackoffMs;
+    private final long metadataExpireMs;
+    private int version;
+    private long lastRefreshMs;
+    private long lastSuccessfulRefreshMs;
+    private Cluster cluster;
+    private boolean needUpdate;
+    private final Set<String> topics;
+
+    /**
+     * Create a metadata instance with reasonable defaults
+     */
+    public Metadata() {
+        this(100L, 60 * 60 * 1000L);
+    }
+
+    /**
+     * Create a new Metadata instance
+     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+     *        polling
+     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+     */
+    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+        this.refreshBackoffMs = refreshBackoffMs;
+        this.metadataExpireMs = metadataExpireMs;
+        this.lastRefreshMs = 0L;
+        this.lastSuccessfulRefreshMs = 0L;
+        this.version = 0;
+        this.cluster = Cluster.empty();
+        this.needUpdate = false;
+        this.topics = new HashSet<String>();
+    }
+
+    /**
+     * Get the current cluster info without blocking
+     */
+    public synchronized Cluster fetch() {
+        return this.cluster;
+    }
+
+    /**
+     * Add the topic to maintain in the metadata
+     */
+    public synchronized void add(String topic) {
+        topics.add(topic);
+    }
+
+    /**
+     * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
+     * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
+     * is now
+     */
+    public synchronized long timeToNextUpdate(long nowMs) {
+        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
+        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+        return Math.max(timeToExpire, timeToAllowUpdate);
+    }
+
+    /**
+     * Request an update of the current cluster metadata info, return the current version before the update
+     */
+    public synchronized int requestUpdate() {
+        this.needUpdate = true;
+        return this.version;
+    }
+
+    /**
+     * Wait for metadata update until the current version is larger than the last version we know of
+     */
+    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
+        if (maxWaitMs < 0) {
+            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
+        }
+        long begin = System.currentTimeMillis();
+        long remainingWaitMs = maxWaitMs;
+        while (this.version <= lastVersion) {
+            if (remainingWaitMs != 0)
+                wait(remainingWaitMs);
+            long elapsed = System.currentTimeMillis() - begin;
+            if (elapsed >= maxWaitMs)
+                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+            remainingWaitMs = maxWaitMs - elapsed;
+        }
+    }
+
+    /**
+     * Add one or more topics to maintain metadata for
+     */
+    public synchronized void addTopics(String... topics) {
+        for (String topic : topics)
+            this.topics.add(topic);
+        requestUpdate();
+    }
+
+    /**
+     * Get the list of topics we are currently maintaining metadata for
+     */
+    public synchronized Set<String> topics() {
+        return new HashSet<String>(this.topics);
+    }
+
+    /**
+     * Check if a topic is already in the topic set.
+     * @param topic topic to check
+     * @return true if the topic exists, false otherwise
+     */
+    public synchronized boolean containsTopic(String topic) {
+        return this.topics.contains(topic);
+    }
+
+    /**
+     * Update the cluster metadata
+     */
+    public synchronized void update(Cluster cluster, long now) {
+        this.needUpdate = false;
+        this.lastRefreshMs = now;
+        this.lastSuccessfulRefreshMs = now;
+        this.version += 1;
+        this.cluster = cluster;
+        notifyAll();
+        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
+    }
+    
+    /**
+     * Record an attempt to update the metadata that failed. We need to keep track of this
+     * to avoid retrying immediately.
+     */
+    public synchronized void failedUpdate(long now) {
+        this.lastRefreshMs = now;
+    }
+    
+    /**
+     * @return The current metadata version
+     */
+    public synchronized int version() {
+        return this.version;
+    }
+
+    /**
+     * The last time metadata was successfully updated.
+     */
+    public synchronized long lastSuccessfulUpdate() {
+        return this.lastSuccessfulRefreshMs;
+    }
+
+    /**
+     * The metadata refresh backoff in ms
+     */
+    public long refreshBackoff() {
+        return refreshBackoffMs;
+    }
+}