You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:41 UTC
[24/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
[FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/33f4c818
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/33f4c818
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/33f4c818
Branch: refs/heads/master
Commit: 33f4c818dd81d259f5b6c06f5caeda0376c40750
Parents: fb5aac2
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 11 16:48:26 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Aug 27 12:40:38 2015 +0200
----------------------------------------------------------------------
.../kafka_backport/clients/ClientRequest.java | 75 ++
.../kafka_backport/clients/ClientResponse.java | 87 ++
.../kafka_backport/clients/ClientUtils.java | 71 ++
.../clients/ClusterConnectionStates.java | 170 +++
.../clients/CommonClientConfigs.java | 69 ++
.../kafka_backport/clients/ConnectionState.java | 29 +
.../clients/InFlightRequests.java | 135 +++
.../kafka_backport/clients/KafkaClient.java | 143 +++
.../flink/kafka_backport/clients/Metadata.java | 195 +++
.../kafka_backport/clients/NetworkClient.java | 528 ++++++++
.../clients/RequestCompletionHandler.java | 32 +
.../clients/consumer/CommitType.java | 26 +
.../clients/consumer/Consumer.java | 135 +++
.../consumer/ConsumerCommitCallback.java | 42 +
.../clients/consumer/ConsumerConfig.java | 334 ++++++
.../consumer/ConsumerRebalanceCallback.java | 104 ++
.../clients/consumer/ConsumerRecord.java | 93 ++
.../clients/consumer/ConsumerRecords.java | 126 ++
.../consumer/ConsumerWakeupException.java | 29 +
.../clients/consumer/KafkaConsumer.java | 1130 ++++++++++++++++++
.../clients/consumer/MockConsumer.java | 209 ++++
.../consumer/NoOffsetForPartitionException.java | 38 +
.../clients/consumer/OffsetResetStrategy.java | 26 +
.../internals/ConsumerNetworkClient.java | 296 +++++
.../clients/consumer/internals/Coordinator.java | 791 ++++++++++++
.../clients/consumer/internals/DelayedTask.java | 24 +
.../consumer/internals/DelayedTaskQueue.java | 96 ++
.../clients/consumer/internals/Fetcher.java | 506 ++++++++
.../clients/consumer/internals/Heartbeat.java | 83 ++
.../internals/NoAvailableBrokersException.java | 32 +
.../NoOpConsumerRebalanceCallback.java | 39 +
.../consumer/internals/RequestFuture.java | 211 ++++
.../internals/RequestFutureAdapter.java | 37 +
.../internals/RequestFutureListener.java | 32 +
.../consumer/internals/SendFailedException.java | 36 +
.../internals/StaleMetadataException.java | 31 +
.../consumer/internals/SubscriptionState.java | 242 ++++
.../flink/kafka_backport/common/Cluster.java | 203 ++++
.../kafka_backport/common/Configurable.java | 40 +
.../kafka_backport/common/KafkaException.java | 51 +
.../flink/kafka_backport/common/Metric.java | 43 +
.../flink/kafka_backport/common/MetricName.java | 194 +++
.../flink/kafka_backport/common/Node.java | 113 ++
.../kafka_backport/common/PartitionInfo.java | 104 ++
.../kafka_backport/common/TopicPartition.java | 89 ++
.../common/config/AbstractConfig.java | 185 +++
.../kafka_backport/common/config/ConfigDef.java | 456 +++++++
.../common/config/ConfigException.java | 49 +
.../common/errors/ApiException.java | 60 +
...onsumerCoordinatorNotAvailableException.java | 49 +
.../common/errors/CorruptRecordException.java | 48 +
.../common/errors/DisconnectException.java | 47 +
.../errors/IllegalGenerationException.java | 42 +
.../common/errors/InterruptException.java | 48 +
.../common/errors/InvalidMetadataException.java | 48 +
.../errors/InvalidRequiredAcksException.java | 34 +
.../common/errors/InvalidTopicException.java | 47 +
.../errors/LeaderNotAvailableException.java | 36 +
.../common/errors/NetworkException.java | 48 +
.../NotCoordinatorForConsumerException.java | 49 +
.../NotEnoughReplicasAfterAppendException.java | 39 +
.../errors/NotEnoughReplicasException.java | 49 +
.../errors/NotLeaderForPartitionException.java | 47 +
.../errors/OffsetLoadInProgressException.java | 49 +
.../common/errors/OffsetMetadataTooLarge.java | 46 +
.../errors/OffsetOutOfRangeException.java | 47 +
.../errors/RecordBatchTooLargeException.java | 48 +
.../common/errors/RecordTooLargeException.java | 47 +
.../common/errors/RetriableException.java | 46 +
.../common/errors/SerializationException.java | 55 +
.../common/errors/TimeoutException.java | 47 +
.../errors/UnknownConsumerIdException.java | 42 +
.../common/errors/UnknownServerException.java | 48 +
.../UnknownTopicOrPartitionException.java | 46 +
.../common/metrics/CompoundStat.java | 61 +
.../common/metrics/JmxReporter.java | 225 ++++
.../common/metrics/KafkaMetric.java | 74 ++
.../common/metrics/Measurable.java | 37 +
.../common/metrics/MeasurableStat.java | 38 +
.../common/metrics/MetricConfig.java | 96 ++
.../kafka_backport/common/metrics/Metrics.java | 211 ++++
.../common/metrics/MetricsReporter.java | 50 +
.../kafka_backport/common/metrics/Quota.java | 61 +
.../common/metrics/QuotaViolationException.java | 41 +
.../kafka_backport/common/metrics/Sensor.java | 182 +++
.../kafka_backport/common/metrics/Stat.java | 41 +
.../common/metrics/stats/Avg.java | 54 +
.../common/metrics/stats/Count.java | 50 +
.../common/metrics/stats/Histogram.java | 166 +++
.../common/metrics/stats/Max.java | 50 +
.../common/metrics/stats/Min.java | 50 +
.../common/metrics/stats/Percentile.java | 49 +
.../common/metrics/stats/Percentiles.java | 125 ++
.../common/metrics/stats/Rate.java | 115 ++
.../common/metrics/stats/SampledStat.java | 139 +++
.../common/metrics/stats/Total.java | 52 +
.../common/network/ByteBufferReceive.java | 66 +
.../common/network/ByteBufferSend.java | 71 ++
.../common/network/InvalidReceiveException.java | 39 +
.../common/network/MultiSend.java | 108 ++
.../common/network/NetworkReceive.java | 128 ++
.../common/network/NetworkSend.java | 51 +
.../kafka_backport/common/network/Receive.java | 54 +
.../common/network/Selectable.java | 115 ++
.../kafka_backport/common/network/Selector.java | 664 ++++++++++
.../kafka_backport/common/network/Send.java | 56 +
.../kafka_backport/common/protocol/ApiKeys.java | 75 ++
.../kafka_backport/common/protocol/Errors.java | 172 +++
.../common/protocol/ProtoUtils.java | 74 ++
.../common/protocol/Protocol.java | 474 ++++++++
.../common/protocol/SecurityProtocol.java | 72 ++
.../common/protocol/types/ArrayOf.java | 88 ++
.../common/protocol/types/Field.java | 78 ++
.../common/protocol/types/Schema.java | 168 +++
.../common/protocol/types/SchemaException.java | 41 +
.../common/protocol/types/Struct.java | 338 ++++++
.../common/protocol/types/Type.java | 259 ++++
.../common/record/ByteBufferInputStream.java | 58 +
.../common/record/ByteBufferOutputStream.java | 66 +
.../common/record/CompressionType.java | 72 ++
.../common/record/Compressor.java | 279 +++++
.../common/record/InvalidRecordException.java | 36 +
.../common/record/KafkaLZ4BlockInputStream.java | 236 ++++
.../record/KafkaLZ4BlockOutputStream.java | 400 +++++++
.../kafka_backport/common/record/LogEntry.java | 57 +
.../common/record/MemoryRecords.java | 280 +++++
.../kafka_backport/common/record/Record.java | 352 ++++++
.../kafka_backport/common/record/Records.java | 54 +
.../common/requests/AbstractRequest.java | 71 ++
.../requests/AbstractRequestResponse.java | 75 ++
.../requests/ConsumerMetadataRequest.java | 74 ++
.../requests/ConsumerMetadataResponse.java | 79 ++
.../common/requests/FetchRequest.java | 174 +++
.../common/requests/FetchResponse.java | 134 +++
.../common/requests/HeartbeatRequest.java | 90 ++
.../common/requests/HeartbeatResponse.java | 64 +
.../common/requests/JoinGroupRequest.java | 121 ++
.../common/requests/JoinGroupResponse.java | 122 ++
.../common/requests/ListOffsetRequest.java | 151 +++
.../common/requests/ListOffsetResponse.java | 127 ++
.../common/requests/MetadataRequest.java | 89 ++
.../common/requests/MetadataResponse.java | 186 +++
.../common/requests/OffsetCommitRequest.java | 275 +++++
.../common/requests/OffsetCommitResponse.java | 109 ++
.../common/requests/OffsetFetchRequest.java | 132 ++
.../common/requests/OffsetFetchResponse.java | 135 +++
.../common/requests/ProduceRequest.java | 141 +++
.../common/requests/ProduceResponse.java | 131 ++
.../common/requests/RequestHeader.java | 89 ++
.../common/requests/RequestSend.java | 64 +
.../common/requests/ResponseHeader.java | 62 +
.../common/requests/ResponseSend.java | 50 +
.../serialization/ByteArrayDeserializer.java | 43 +
.../serialization/ByteArraySerializer.java | 43 +
.../common/serialization/Deserializer.java | 53 +
.../serialization/IntegerDeserializer.java | 53 +
.../common/serialization/IntegerSerializer.java | 47 +
.../common/serialization/Serializer.java | 58 +
.../serialization/StringDeserializer.java | 62 +
.../common/serialization/StringSerializer.java | 62 +
.../common/utils/AbstractIterator.java | 97 ++
.../common/utils/CollectionUtils.java | 71 ++
.../common/utils/CopyOnWriteMap.java | 151 +++
.../kafka_backport/common/utils/Crc32.java | 396 ++++++
.../common/utils/KafkaThread.java | 44 +
.../kafka_backport/common/utils/SystemTime.java | 52 +
.../flink/kafka_backport/common/utils/Time.java | 48 +
.../kafka_backport/common/utils/Utils.java | 506 ++++++++
.../flink/kafka_backport/package-info.java | 10 +
.../streaming/connectors/internals/Fetcher.java | 2 +-
.../internals/FlinkKafkaConsumerBase.java | 10 +-
.../connectors/internals/IncludedFetcher.java | 12 +-
.../connectors/internals/LegacyFetcher.java | 6 +-
.../kafka/copied/clients/ClientRequest.java | 66 -
.../kafka/copied/clients/ClientResponse.java | 78 --
.../kafka/copied/clients/ClientUtils.java | 64 -
.../copied/clients/ClusterConnectionStates.java | 161 ---
.../copied/clients/CommonClientConfigs.java | 60 -
.../kafka/copied/clients/ConnectionState.java | 20 -
.../kafka/copied/clients/InFlightRequests.java | 126 --
.../kafka/copied/clients/KafkaClient.java | 134 ---
.../apache/kafka/copied/clients/Metadata.java | 186 ---
.../kafka/copied/clients/NetworkClient.java | 519 --------
.../clients/RequestCompletionHandler.java | 23 -
.../copied/clients/consumer/CommitType.java | 17 -
.../kafka/copied/clients/consumer/Consumer.java | 126 --
.../consumer/ConsumerCommitCallback.java | 33 -
.../copied/clients/consumer/ConsumerConfig.java | 325 -----
.../consumer/ConsumerRebalanceCallback.java | 95 --
.../copied/clients/consumer/ConsumerRecord.java | 84 --
.../clients/consumer/ConsumerRecords.java | 117 --
.../consumer/ConsumerWakeupException.java | 20 -
.../copied/clients/consumer/KafkaConsumer.java | 1121 -----------------
.../copied/clients/consumer/MockConsumer.java | 200 ----
.../consumer/NoOffsetForPartitionException.java | 29 -
.../clients/consumer/OffsetResetStrategy.java | 17 -
.../internals/ConsumerNetworkClient.java | 296 -----
.../clients/consumer/internals/Coordinator.java | 791 ------------
.../clients/consumer/internals/DelayedTask.java | 24 -
.../consumer/internals/DelayedTaskQueue.java | 96 --
.../clients/consumer/internals/Fetcher.java | 498 --------
.../clients/consumer/internals/Heartbeat.java | 74 --
.../internals/NoAvailableBrokersException.java | 23 -
.../NoOpConsumerRebalanceCallback.java | 30 -
.../consumer/internals/RequestFuture.java | 202 ----
.../internals/RequestFutureAdapter.java | 28 -
.../internals/RequestFutureListener.java | 23 -
.../consumer/internals/SendFailedException.java | 27 -
.../internals/StaleMetadataException.java | 22 -
.../consumer/internals/SubscriptionState.java | 233 ----
.../org/apache/kafka/copied/common/Cluster.java | 194 ---
.../kafka/copied/common/Configurable.java | 31 -
.../kafka/copied/common/KafkaException.java | 42 -
.../org/apache/kafka/copied/common/Metric.java | 34 -
.../apache/kafka/copied/common/MetricName.java | 185 ---
.../org/apache/kafka/copied/common/Node.java | 104 --
.../kafka/copied/common/PartitionInfo.java | 95 --
.../kafka/copied/common/TopicPartition.java | 79 --
.../copied/common/config/AbstractConfig.java | 176 ---
.../kafka/copied/common/config/ConfigDef.java | 447 -------
.../copied/common/config/ConfigException.java | 40 -
.../copied/common/errors/ApiException.java | 51 -
...onsumerCoordinatorNotAvailableException.java | 40 -
.../common/errors/CorruptRecordException.java | 39 -
.../common/errors/DisconnectException.java | 39 -
.../errors/IllegalGenerationException.java | 33 -
.../common/errors/InterruptException.java | 39 -
.../common/errors/InvalidMetadataException.java | 39 -
.../errors/InvalidRequiredAcksException.java | 25 -
.../common/errors/InvalidTopicException.java | 38 -
.../errors/LeaderNotAvailableException.java | 27 -
.../copied/common/errors/NetworkException.java | 39 -
.../NotCoordinatorForConsumerException.java | 40 -
.../NotEnoughReplicasAfterAppendException.java | 30 -
.../errors/NotEnoughReplicasException.java | 40 -
.../errors/NotLeaderForPartitionException.java | 38 -
.../errors/OffsetLoadInProgressException.java | 40 -
.../common/errors/OffsetMetadataTooLarge.java | 37 -
.../errors/OffsetOutOfRangeException.java | 38 -
.../errors/RecordBatchTooLargeException.java | 39 -
.../common/errors/RecordTooLargeException.java | 38 -
.../common/errors/RetriableException.java | 37 -
.../common/errors/SerializationException.java | 46 -
.../copied/common/errors/TimeoutException.java | 38 -
.../errors/UnknownConsumerIdException.java | 33 -
.../common/errors/UnknownServerException.java | 39 -
.../UnknownTopicOrPartitionException.java | 37 -
.../copied/common/metrics/CompoundStat.java | 52 -
.../copied/common/metrics/JmxReporter.java | 216 ----
.../copied/common/metrics/KafkaMetric.java | 65 -
.../kafka/copied/common/metrics/Measurable.java | 28 -
.../copied/common/metrics/MeasurableStat.java | 30 -
.../copied/common/metrics/MetricConfig.java | 87 --
.../kafka/copied/common/metrics/Metrics.java | 202 ----
.../copied/common/metrics/MetricsReporter.java | 41 -
.../kafka/copied/common/metrics/Quota.java | 52 -
.../common/metrics/QuotaViolationException.java | 32 -
.../kafka/copied/common/metrics/Sensor.java | 173 ---
.../kafka/copied/common/metrics/Stat.java | 32 -
.../kafka/copied/common/metrics/stats/Avg.java | 45 -
.../copied/common/metrics/stats/Count.java | 41 -
.../copied/common/metrics/stats/Histogram.java | 157 ---
.../kafka/copied/common/metrics/stats/Max.java | 41 -
.../kafka/copied/common/metrics/stats/Min.java | 41 -
.../copied/common/metrics/stats/Percentile.java | 40 -
.../common/metrics/stats/Percentiles.java | 116 --
.../kafka/copied/common/metrics/stats/Rate.java | 106 --
.../common/metrics/stats/SampledStat.java | 130 --
.../copied/common/metrics/stats/Total.java | 43 -
.../common/network/ByteBufferReceive.java | 57 -
.../copied/common/network/ByteBufferSend.java | 62 -
.../common/network/InvalidReceiveException.java | 30 -
.../kafka/copied/common/network/MultiSend.java | 100 --
.../copied/common/network/NetworkReceive.java | 119 --
.../copied/common/network/NetworkSend.java | 42 -
.../kafka/copied/common/network/Receive.java | 45 -
.../kafka/copied/common/network/Selectable.java | 106 --
.../kafka/copied/common/network/Selector.java | 655 ----------
.../kafka/copied/common/network/Send.java | 47 -
.../kafka/copied/common/protocol/ApiKeys.java | 66 -
.../kafka/copied/common/protocol/Errors.java | 163 ---
.../copied/common/protocol/ProtoUtils.java | 65 -
.../kafka/copied/common/protocol/Protocol.java | 470 --------
.../common/protocol/SecurityProtocol.java | 63 -
.../copied/common/protocol/types/ArrayOf.java | 79 --
.../copied/common/protocol/types/Field.java | 69 --
.../copied/common/protocol/types/Schema.java | 159 ---
.../common/protocol/types/SchemaException.java | 32 -
.../copied/common/protocol/types/Struct.java | 329 -----
.../copied/common/protocol/types/Type.java | 250 ----
.../common/record/ByteBufferInputStream.java | 49 -
.../common/record/ByteBufferOutputStream.java | 57 -
.../copied/common/record/CompressionType.java | 63 -
.../kafka/copied/common/record/Compressor.java | 270 -----
.../common/record/InvalidRecordException.java | 27 -
.../common/record/KafkaLZ4BlockInputStream.java | 233 ----
.../record/KafkaLZ4BlockOutputStream.java | 391 ------
.../kafka/copied/common/record/LogEntry.java | 48 -
.../copied/common/record/MemoryRecords.java | 271 -----
.../kafka/copied/common/record/Record.java | 344 ------
.../kafka/copied/common/record/Records.java | 45 -
.../copied/common/requests/AbstractRequest.java | 62 -
.../requests/AbstractRequestResponse.java | 66 -
.../requests/ConsumerMetadataRequest.java | 65 -
.../requests/ConsumerMetadataResponse.java | 70 --
.../copied/common/requests/FetchRequest.java | 165 ---
.../copied/common/requests/FetchResponse.java | 125 --
.../common/requests/HeartbeatRequest.java | 81 --
.../common/requests/HeartbeatResponse.java | 55 -
.../common/requests/JoinGroupRequest.java | 112 --
.../common/requests/JoinGroupResponse.java | 113 --
.../common/requests/ListOffsetRequest.java | 142 ---
.../common/requests/ListOffsetResponse.java | 118 --
.../copied/common/requests/MetadataRequest.java | 80 --
.../common/requests/MetadataResponse.java | 177 ---
.../common/requests/OffsetCommitRequest.java | 266 -----
.../common/requests/OffsetCommitResponse.java | 100 --
.../common/requests/OffsetFetchRequest.java | 123 --
.../common/requests/OffsetFetchResponse.java | 126 --
.../copied/common/requests/ProduceRequest.java | 132 --
.../copied/common/requests/ProduceResponse.java | 122 --
.../copied/common/requests/RequestHeader.java | 80 --
.../copied/common/requests/RequestSend.java | 55 -
.../copied/common/requests/ResponseHeader.java | 55 -
.../copied/common/requests/ResponseSend.java | 41 -
.../serialization/ByteArrayDeserializer.java | 34 -
.../serialization/ByteArraySerializer.java | 34 -
.../common/serialization/Deserializer.java | 44 -
.../serialization/IntegerDeserializer.java | 44 -
.../common/serialization/IntegerSerializer.java | 38 -
.../copied/common/serialization/Serializer.java | 49 -
.../serialization/StringDeserializer.java | 53 -
.../common/serialization/StringSerializer.java | 53 -
.../copied/common/utils/AbstractIterator.java | 88 --
.../copied/common/utils/CollectionUtils.java | 62 -
.../copied/common/utils/CopyOnWriteMap.java | 142 ---
.../apache/kafka/copied/common/utils/Crc32.java | 387 ------
.../kafka/copied/common/utils/KafkaThread.java | 35 -
.../kafka/copied/common/utils/SystemTime.java | 43 -
.../apache/kafka/copied/common/utils/Time.java | 39 -
.../apache/kafka/copied/common/utils/Utils.java | 497 --------
tools/maven/suppressions.xml | 4 +-
342 files changed, 20627 insertions(+), 19160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java
new file mode 100644
index 0000000..d86ea96
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientRequest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.requests.RequestSend;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A request being sent to the server. This holds both the network send as well as the client-level metadata.
+ */
+public final class ClientRequest {
+
+ private final long createdMs;
+ private final boolean expectResponse;
+ private final RequestSend request;
+ private final RequestCompletionHandler callback;
+
+ /**
+ * @param createdMs The unix timestamp in milliseconds for the time at which this request was created.
+ * @param expectResponse Should we expect a response message or is this request complete once it is sent?
+ * @param request The request
+ * @param callback A callback to execute when the response has been received (or null if no callback is necessary)
+ */
+ public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, RequestCompletionHandler callback) {
+ this.createdMs = createdMs;
+ this.callback = callback;
+ this.request = request;
+ this.expectResponse = expectResponse;
+ }
+
+ @Override
+ public String toString() {
+ return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + ", request=" + request
+ + ")";
+ }
+
+ public boolean expectResponse() {
+ return expectResponse;
+ }
+
+ public RequestSend request() {
+ return request;
+ }
+
+ public boolean hasCallback() {
+ return callback != null;
+ }
+
+ public RequestCompletionHandler callback() {
+ return callback;
+ }
+
+ public long createdTime() {
+ return createdMs;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java
new file mode 100644
index 0000000..49a7540
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientResponse.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A response from the server. Contains both the body of the response as well as the correlated request that was
+ * originally sent.
+ */
+public class ClientResponse {
+
+ private final long received;
+ private final boolean disconnected;
+ private final ClientRequest request;
+ private final Struct responseBody;
+
+ /**
+ * @param request The original request
+ * @param received The unix timestamp when this response was received
+ * @param disconnected Whether the client disconnected before fully reading a response
+ * @param responseBody The response contents (or null) if we disconnected or no response was expected
+ */
+ public ClientResponse(ClientRequest request, long received, boolean disconnected, Struct responseBody) {
+ super();
+ this.received = received;
+ this.disconnected = disconnected;
+ this.request = request;
+ this.responseBody = responseBody;
+ }
+
+ public long receivedTime() {
+ return received;
+ }
+
+ public boolean wasDisconnected() {
+ return disconnected;
+ }
+
+ public ClientRequest request() {
+ return request;
+ }
+
+ public Struct responseBody() {
+ return responseBody;
+ }
+
+ public boolean hasResponse() {
+ return responseBody != null;
+ }
+
+ public long requestLatencyMs() {
+ return receivedTime() - this.request.createdTime();
+ }
+
+ @Override
+ public String toString() {
+ return "ClientResponse(received=" + received +
+ ", disconnected=" +
+ disconnected +
+ ", request=" +
+ request +
+ ", responseBody=" +
+ responseBody +
+ ")";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java
new file mode 100644
index 0000000..1a00a78
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClientUtils.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.config.ConfigException;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ClientUtils {
+ private static final Logger log = LoggerFactory.getLogger(ClientUtils.class);
+
+ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) {
+ List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
+ for (String url : urls) {
+ if (url != null && url.length() > 0) {
+ String host = Utils.getHost(url);
+ Integer port = Utils.getPort(url);
+ if (host == null || port == null)
+ throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ try {
+ InetSocketAddress address = new InetSocketAddress(host, port);
+ if (address.isUnresolved())
+ throw new ConfigException("DNS resolution failed for url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ addresses.add(address);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+ }
+ }
+ }
+ if (addresses.size() < 1)
+ throw new ConfigException("No bootstrap urls given in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ return addresses;
+ }
+
+ public static void closeQuietly(Closeable c, String name, AtomicReference<Throwable> firstException) {
+ if (c != null) {
+ try {
+ c.close();
+ } catch (Throwable t) {
+ firstException.compareAndSet(null, t);
+ log.error("Failed to close " + name, t);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java
new file mode 100644
index 0000000..395164b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ClusterConnectionStates.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The state of our connection to each node in the cluster.
+ *
+ */
+final class ClusterConnectionStates {
+ private final long reconnectBackoffMs;
+ private final Map<String, NodeConnectionState> nodeState;
+
+ public ClusterConnectionStates(long reconnectBackoffMs) {
+ this.reconnectBackoffMs = reconnectBackoffMs;
+ this.nodeState = new HashMap<String, NodeConnectionState>();
+ }
+
+ /**
+ * Return true iff we can currently initiate a new connection. This will be the case if we are not
+ * connected and haven't been connected for at least the minimum reconnection backoff period.
+ * @param id The connection id to check
+ * @param now The current time in MS
+ * @return true if we can initiate a new connection
+ */
+ public boolean canConnect(String id, long now) {
+ NodeConnectionState state = nodeState.get(id);
+ if (state == null)
+ return true;
+ else
+ return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs;
+ }
+
+ /**
+ * Return true if we are disconnected from the given node and can't re-establish a connection yet
+ * @param id The connection to check
+ * @param now The current time in ms
+ */
+ public boolean isBlackedOut(String id, long now) {
+ NodeConnectionState state = nodeState.get(id);
+ if (state == null)
+ return false;
+ else
+ return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs < this.reconnectBackoffMs;
+ }
+
+ /**
+ * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+ * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+ * connections.
+ * @param id The connection to check
+ * @param now The current time in ms
+ */
+ public long connectionDelay(String id, long now) {
+ NodeConnectionState state = nodeState.get(id);
+ if (state == null) return 0;
+ long timeWaited = now - state.lastConnectAttemptMs;
+ if (state.state == ConnectionState.DISCONNECTED) {
+ return Math.max(this.reconnectBackoffMs - timeWaited, 0);
+ } else {
+ // When connecting or connected, we should be able to delay indefinitely since other events (connection or
+ // data acked) will cause a wakeup once data can be sent.
+ return Long.MAX_VALUE;
+ }
+ }
+
+ /**
+ * Enter the connecting state for the given connection.
+ * @param id The id of the connection
+ * @param now The current time.
+ */
+ public void connecting(String id, long now) {
+ nodeState.put(id, new NodeConnectionState(ConnectionState.CONNECTING, now));
+ }
+
+ /**
+ * Return true iff a specific connection is connected
+ * @param id The id of the connection to check
+ */
+ public boolean isConnected(String id) {
+ NodeConnectionState state = nodeState.get(id);
+ return state != null && state.state == ConnectionState.CONNECTED;
+ }
+
+ /**
+ * Return true iff we are in the process of connecting
+ * @param id The id of the connection
+ */
+ public boolean isConnecting(String id) {
+ NodeConnectionState state = nodeState.get(id);
+ return state != null && state.state == ConnectionState.CONNECTING;
+ }
+
+ /**
+ * Enter the connected state for the given connection
+ * @param id The connection identifier
+ */
+ public void connected(String id) {
+ NodeConnectionState nodeState = nodeState(id);
+ nodeState.state = ConnectionState.CONNECTED;
+ }
+
+ /**
+ * Enter the disconnected state for the given node
+ * @param id The connection we have disconnected
+ */
+ public void disconnected(String id) {
+ NodeConnectionState nodeState = nodeState(id);
+ nodeState.state = ConnectionState.DISCONNECTED;
+ }
+
+ /**
+ * Get the state of a given connection
+ * @param id The id of the connection
+ * @return The state of our connection
+ */
+ public ConnectionState connectionState(String id) {
+ return nodeState(id).state;
+ }
+
+ /**
+ * Get the state of a given node
+ * @param id The connection to fetch the state for
+ */
+ private NodeConnectionState nodeState(String id) {
+ NodeConnectionState state = this.nodeState.get(id);
+ if (state == null)
+ throw new IllegalStateException("No entry found for connection " + id);
+ return state;
+ }
+
+ /**
+ * The state of our connection to a node
+ */
+ private static class NodeConnectionState {
+
+ ConnectionState state;
+ long lastConnectAttemptMs;
+
+ public NodeConnectionState(ConnectionState state, long lastConnectAttempt) {
+ this.state = state;
+ this.lastConnectAttemptMs = lastConnectAttempt;
+ }
+
+ public String toString() {
+ return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java
new file mode 100644
index 0000000..01e9da2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/CommonClientConfigs.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.clients;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Some configurations shared by both producer and consumer
+ */
+public class CommonClientConfigs {
+
+ /*
+ * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+ */
+
+ public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+ public static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
+ + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
+ + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
+ + "servers (you may want more than one, though, in case a server is down).";
+
+ public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
+ public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
+
+ public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
+ public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";
+
+ public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
+ public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.";
+
+ public static final String CLIENT_ID_CONFIG = "client.id";
+ public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";
+
+ public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
+ public static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.";
+
+ public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
+ public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.";
+
+ public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
+ public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics.";
+
+ public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
+ public static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
+
+ public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
+ public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
+
+ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
+ public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java
new file mode 100644
index 0000000..34fd7e2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/ConnectionState.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The states of a node connection
+ */
+public enum ConnectionState {
+ DISCONNECTED, CONNECTING, CONNECTED
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java
new file mode 100644
index 0000000..9fc489b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/InFlightRequests.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The set of requests which have been sent or are being sent but haven't yet received a response
+ */
+final class InFlightRequests {
+
+ private final int maxInFlightRequestsPerConnection;
+ private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
+
+ public InFlightRequests(int maxInFlightRequestsPerConnection) {
+ this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
+ }
+
+ /**
+ * Add the given request to the queue for the connection it was directed to
+ */
+ public void add(ClientRequest request) {
+ Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
+ if (reqs == null) {
+ reqs = new ArrayDeque<ClientRequest>();
+ this.requests.put(request.request().destination(), reqs);
+ }
+ reqs.addFirst(request);
+ }
+
+ /**
+ * Get the request queue for the given node
+ */
+ private Deque<ClientRequest> requestQueue(String node) {
+ Deque<ClientRequest> reqs = requests.get(node);
+ if (reqs == null || reqs.isEmpty())
+ throw new IllegalStateException("Response from server for which there are no in-flight requests.");
+ return reqs;
+ }
+
+ /**
+ * Get the oldest request (the one that that will be completed next) for the given node
+ */
+ public ClientRequest completeNext(String node) {
+ return requestQueue(node).pollLast();
+ }
+
+ /**
+ * Get the last request we sent to the given node (but don't remove it from the queue)
+ * @param node The node id
+ */
+ public ClientRequest lastSent(String node) {
+ return requestQueue(node).peekFirst();
+ }
+
+ /**
+ * Complete the last request that was sent to a particular node.
+ * @param node The node the request was sent to
+ * @return The request
+ */
+ public ClientRequest completeLastSent(String node) {
+ return requestQueue(node).pollFirst();
+ }
+
+ /**
+ * Can we send more requests to this node?
+ *
+ * @param node Node in question
+ * @return true iff we have no requests still being sent to the given node
+ */
+ public boolean canSendMore(String node) {
+ Deque<ClientRequest> queue = requests.get(node);
+ return queue == null || queue.isEmpty() ||
+ (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
+ }
+
+ /**
+ * Return the number of inflight requests directed at the given node
+ * @param node The node
+ * @return The request count.
+ */
+ public int inFlightRequestCount(String node) {
+ Deque<ClientRequest> queue = requests.get(node);
+ return queue == null ? 0 : queue.size();
+ }
+
+ /**
+ * Count all in-flight requests for all nodes
+ */
+ public int inFlightRequestCount() {
+ int total = 0;
+ for (Deque<ClientRequest> deque : this.requests.values())
+ total += deque.size();
+ return total;
+ }
+
+ /**
+ * Clear out all the in-flight requests for the given node and return them
+ *
+ * @param node The node
+ * @return All the in-flight requests for that node that have been removed
+ */
+ public Iterable<ClientRequest> clearAll(String node) {
+ Deque<ClientRequest> reqs = requests.get(node);
+ if (reqs == null) {
+ return Collections.emptyList();
+ } else {
+ return requests.remove(node);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java
new file mode 100644
index 0000000..2c3d733
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/KafkaClient.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.requests.RequestHeader;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+
+import java.io.Closeable;
+import java.util.List;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The interface for {@link NetworkClient}
+ */
+public interface KafkaClient extends Closeable {
+
+ /**
+ * Check if we are currently ready to send another request to the given node but don't attempt to connect if we
+ * aren't.
+ *
+ * @param node The node to check
+ * @param now The current timestamp
+ */
+ public boolean isReady(Node node, long now);
+
+ /**
+ * Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a
+ * node will change only when poll is invoked.
+ *
+ * @param node The node to connect to.
+ * @param now The current time
+ * @return true iff we are ready to immediately initiate the sending of another request to the given node.
+ */
+ public boolean ready(Node node, long now);
+
+ /**
+ * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+ * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+ * connections.
+ *
+ * @param node The node to check
+ * @param now The current timestamp
+ * @return The number of milliseconds to wait.
+ */
+ public long connectionDelay(Node node, long now);
+
+ /**
+ * Check if the connection of the node has failed, based on the connection state. Such connection failure are
+ * usually transient and can be resumed in the next {@link #ready(Node, long)} }
+ * call, but there are cases where transient failures needs to be caught and re-acted upon.
+ *
+ * @param node the node to check
+ * @return true iff the connection has failed and the node is disconnected
+ */
+ public boolean connectionFailed(Node node);
+
+ /**
+ * Queue up the given request for sending. Requests can only be sent on ready connections.
+ *
+ * @param request The request
+ */
+ public void send(ClientRequest request);
+
+ /**
+ * Do actual reads and writes from sockets.
+ *
+ * @param timeout The maximum amount of time to wait for responses in ms
+ * @param now The current time in ms
+ * @throws IllegalStateException If a request is sent to an unready node
+ */
+ public List<ClientResponse> poll(long timeout, long now);
+
+ /**
+ * Complete all in-flight requests for a given connection
+ *
+ * @param id The connection to complete requests for
+ * @param now The current time in ms
+ * @return All requests that complete during this time period.
+ */
+ public List<ClientResponse> completeAll(String id, long now);
+
+ /**
+ * Complete all in-flight requests
+ *
+ * @param now The current time in ms
+ * @return All requests that complete during this time period.
+ */
+ public List<ClientResponse> completeAll(long now);
+
+ /**
+ * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection,
+ * but will potentially choose a node for which we don't yet have a connection if all existing connections are in
+ * use.
+ *
+ * @param now The current time in ms
+ * @return The node with the fewest in-flight requests.
+ */
+ public Node leastLoadedNode(long now);
+
+ /**
+ * The number of currently in-flight requests for which we have not yet returned a response
+ */
+ public int inFlightRequestCount();
+
+ /**
+ * Get the total in-flight requests for a particular node
+ *
+ * @param nodeId The id of the node
+ */
+ public int inFlightRequestCount(String nodeId);
+
+ /**
+ * Generate a request header for the next request
+ *
+ * @param key The API key of the request
+ */
+ public RequestHeader nextRequestHeader(ApiKeys key);
+
+ /**
+ * Wake up the client if it is currently blocked waiting for I/O
+ */
+ public void wakeup();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java
new file mode 100644
index 0000000..1d228f7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/Metadata.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.errors.TimeoutException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A class encapsulating some of the logic around metadata.
+ * <p>
+ * This class is shared by the client thread (for partitioning) and the background sender thread.
+ *
+ * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
+ * topic we don't have any metadata for it will trigger a metadata update.
+ */
+public final class Metadata {
+
+ private static final Logger log = LoggerFactory.getLogger(Metadata.class);
+
+ private final long refreshBackoffMs;
+ private final long metadataExpireMs;
+ private int version;
+ private long lastRefreshMs;
+ private long lastSuccessfulRefreshMs;
+ private Cluster cluster;
+ private boolean needUpdate;
+ private final Set<String> topics;
+
+ /**
+ * Create a metadata instance with reasonable defaults
+ */
+ public Metadata() {
+ this(100L, 60 * 60 * 1000L);
+ }
+
+ /**
+ * Create a new Metadata instance
+ * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+ * polling
+ * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+ */
+ public Metadata(long refreshBackoffMs, long metadataExpireMs) {
+ this.refreshBackoffMs = refreshBackoffMs;
+ this.metadataExpireMs = metadataExpireMs;
+ this.lastRefreshMs = 0L;
+ this.lastSuccessfulRefreshMs = 0L;
+ this.version = 0;
+ this.cluster = Cluster.empty();
+ this.needUpdate = false;
+ this.topics = new HashSet<String>();
+ }
+
+ /**
+ * Get the current cluster info without blocking
+ */
+ public synchronized Cluster fetch() {
+ return this.cluster;
+ }
+
+ /**
+ * Add the topic to maintain in the metadata
+ */
+ public synchronized void add(String topic) {
+ topics.add(topic);
+ }
+
+ /**
+ * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
+ * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
+ * is now
+ */
+ public synchronized long timeToNextUpdate(long nowMs) {
+ long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
+ long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+ return Math.max(timeToExpire, timeToAllowUpdate);
+ }
+
+ /**
+ * Request an update of the current cluster metadata info, return the current version before the update
+ */
+ public synchronized int requestUpdate() {
+ this.needUpdate = true;
+ return this.version;
+ }
+
+ /**
+ * Wait for metadata update until the current version is larger than the last version we know of
+ */
+ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
+ if (maxWaitMs < 0) {
+ throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
+ }
+ long begin = System.currentTimeMillis();
+ long remainingWaitMs = maxWaitMs;
+ while (this.version <= lastVersion) {
+ if (remainingWaitMs != 0)
+ wait(remainingWaitMs);
+ long elapsed = System.currentTimeMillis() - begin;
+ if (elapsed >= maxWaitMs)
+ throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
+ remainingWaitMs = maxWaitMs - elapsed;
+ }
+ }
+
+ /**
+ * Add one or more topics to maintain metadata for
+ */
+ public synchronized void addTopics(String... topics) {
+ for (String topic : topics)
+ this.topics.add(topic);
+ requestUpdate();
+ }
+
+ /**
+ * Get the list of topics we are currently maintaining metadata for
+ */
+ public synchronized Set<String> topics() {
+ return new HashSet<String>(this.topics);
+ }
+
+ /**
+ * Check if a topic is already in the topic set.
+ * @param topic topic to check
+ * @return true if the topic exists, false otherwise
+ */
+ public synchronized boolean containsTopic(String topic) {
+ return this.topics.contains(topic);
+ }
+
+ /**
+ * Update the cluster metadata
+ */
+ public synchronized void update(Cluster cluster, long now) {
+ this.needUpdate = false;
+ this.lastRefreshMs = now;
+ this.lastSuccessfulRefreshMs = now;
+ this.version += 1;
+ this.cluster = cluster;
+ notifyAll();
+ log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
+ }
+
+ /**
+ * Record an attempt to update the metadata that failed. We need to keep track of this
+ * to avoid retrying immediately.
+ */
+ public synchronized void failedUpdate(long now) {
+ this.lastRefreshMs = now;
+ }
+
+ /**
+ * @return The current metadata version
+ */
+ public synchronized int version() {
+ return this.version;
+ }
+
+ /**
+ * The last time metadata was successfully updated.
+ */
+ public synchronized long lastSuccessfulUpdate() {
+ return this.lastSuccessfulRefreshMs;
+ }
+
+ /**
+ * The metadata refresh backoff in ms
+ */
+ public long refreshBackoff() {
+ return refreshBackoffMs;
+ }
+}