You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:33 UTC
[16/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add
comments to all backported kafka sources and move them to
'org.apache.flink.kafka_backport'
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
new file mode 100644
index 0000000..f7c8981
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol;
+
+import org.apache.flink.kafka_backport.common.protocol.types.ArrayOf;
+import org.apache.flink.kafka_backport.common.protocol.types.Field;
+import org.apache.flink.kafka_backport.common.protocol.types.Schema;
+import org.apache.flink.kafka_backport.common.protocol.types.Type;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class Protocol {
+
+ public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", Type.INT16, "The id of the request type."),
+ new Field("api_version", Type.INT16, "The version of the API."),
+ new Field("correlation_id",
+ Type.INT32,
+ "A user-supplied integer value that will be passed back with the response"),
+ new Field("client_id",
+ Type.STRING,
+ "A user specified identifier for the client making the request."));
+
+ public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
+ Type.INT32,
+ "The user-supplied value passed in with the request"));
+
+ /* Metadata api */
+
+ public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
+ new ArrayOf(Type.STRING),
+ "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+
+ public static final Schema BROKER = new Schema(new Field("node_id", Type.INT32, "The broker id."),
+ new Field("host", Type.STRING, "The hostname of the broker."),
+ new Field("port",
+ Type.INT32,
+ "The port on which the broker accepts requests."));
+
+ public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
+ Type.INT16,
+ "The error code for the partition, if any."),
+ new Field("partition_id",
+ Type.INT32,
+ "The id of the partition."),
+ new Field("leader",
+ Type.INT32,
+ "The id of the broker acting as leader for this partition."),
+ new Field("replicas",
+ new ArrayOf(Type.INT32),
+ "The set of all nodes that host this partition."),
+ new Field("isr",
+ new ArrayOf(Type.INT32),
+ "The set of nodes that are in sync with the leader for this partition."));
+
+ public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
+ Type.INT16,
+ "The error code for the given topic."),
+ new Field("topic", Type.STRING, "The name of the topic"),
+ new Field("partition_metadata",
+ new ArrayOf(PARTITION_METADATA_V0),
+ "Metadata for each partition of the topic."));
+
+ public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
+ new ArrayOf(BROKER),
+ "Host and port information for all brokers."),
+ new Field("topic_metadata",
+ new ArrayOf(TOPIC_METADATA_V0)));
+
+ public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
+ public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
+
+ /* Produce api */
+
+ public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", Type.STRING),
+ new Field("data", new ArrayOf(new Schema(new Field("partition", Type.INT32),
+ new Field("record_set", Type.BYTES)))));
+
+ public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
+ Type.INT16,
+ "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+ new Field("timeout", Type.INT32, "The time to await a response in ms."),
+ new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+ public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(new Schema(new Field("topic", Type.STRING),
+ new Field("partition_responses",
+ new ArrayOf(new Schema(new Field("partition",
+ Type.INT32),
+ new Field("error_code",
+ Type.INT16),
+ new Field("base_offset",
+ Type.INT64))))))));
+
+ public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
+ public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
+
+ /* Offset commit api */
+ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("offset",
+ Type.INT64,
+ "Message offset to be committed."),
+ new Field("metadata",
+ Type.STRING,
+ "Any associated metadata the client wants to keep."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("offset",
+ Type.INT64,
+ "Message offset to be committed."),
+ new Field("timestamp",
+ Type.INT64,
+ "Timestamp of the commit"),
+ new Field("metadata",
+ Type.STRING,
+ "Any associated metadata the client wants to keep."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("offset",
+ Type.INT64,
+ "Message offset to be committed."),
+ new Field("metadata",
+ Type.STRING,
+ "Any associated metadata the client wants to keep."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ Type.STRING,
+ "Topic to commit."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
+ "Partitions to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
+ Type.STRING,
+ "Topic to commit."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
+ "Partitions to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic",
+ Type.STRING,
+ "Topic to commit."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2),
+ "Partitions to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
+ Type.STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ "Topics to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
+ Type.STRING,
+ "The consumer group id."),
+ new Field("group_generation_id",
+ Type.INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ Type.STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
+ "Topics to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
+ Type.STRING,
+ "The consumer group id."),
+ new Field("group_generation_id",
+ Type.INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ Type.STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("retention_time",
+ Type.INT64,
+ "Time period in ms to retain the offset."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
+ "Topics to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("error_code",
+ Type.INT16));
+
+ public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
+
+ public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
+
+ /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
+ public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
+ public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
+
+ public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
+
+ /* Offset fetch api */
+
+ /*
+ * Wire formats of version 0 and 1 are the same, but with different functionality.
+ * Version 0 will read the offsets from ZK;
+ * Version 1 will read the offsets from Kafka.
+ */
+ public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."));
+
+ public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ Type.STRING,
+ "Topic to fetch offset."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch offsets."));
+
+ public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
+ Type.STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch offsets."));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("offset",
+ Type.INT64,
+ "Last committed message offset."),
+ new Field("metadata",
+ Type.STRING,
+ "Any associated metadata the client wants to keep."),
+ new Field("error_code", Type.INT16));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+ public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
+ public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
+
+ public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1};
+ public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1};
+
+ /* List offset api */
+ public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("timestamp", Type.INT64, "Timestamp."),
+ new Field("max_num_offsets",
+ Type.INT32,
+ "Maximum offsets to return."));
+
+ public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ Type.STRING,
+ "Topic to list offset."),
+ new Field("partitions",
+ new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
+ "Partitions to list offset."));
+
+ public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+ Type.INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("topics",
+ new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
+ "Topics to list offsets."));
+
+ public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("error_code", Type.INT16),
+ new Field("offsets",
+ new ArrayOf(Type.INT64),
+ "A list of offsets."));
+
+ public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+ new Field("partition_responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+ public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0};
+ public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0};
+
+ /* Fetch api */
+ public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("fetch_offset",
+ Type.INT64,
+ "Message offset."),
+ new Field("max_bytes",
+ Type.INT32,
+ "Maximum bytes to fetch."));
+
+ public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", Type.STRING, "Topic to fetch."),
+ new Field("partitions",
+ new ArrayOf(FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch."));
+
+ public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
+ Type.INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("max_wait_time",
+ Type.INT32,
+ "Maximum time in ms to wait for the response."),
+ new Field("min_bytes",
+ Type.INT32,
+ "Minimum bytes to accumulate in the response."),
+ new Field("topics",
+ new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch."));
+
+ public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ Type.INT32,
+ "Topic partition id."),
+ new Field("error_code", Type.INT16),
+ new Field("high_watermark",
+ Type.INT64,
+ "Last committed offset."),
+ new Field("record_set", Type.BYTES));
+
+ public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+ new Field("partition_responses",
+ new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+
+ public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
+ public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
+
+ /* Consumer metadata api */
+ public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+ Type.STRING,
+ "The consumer group id."));
+
+ public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16),
+ new Field("coordinator",
+ BROKER,
+ "Host and port information for the coordinator for a consumer group."));
+
+ public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
+ public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
+
+ /* Join group api */
+ public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
+ Type.STRING,
+ "The consumer group id."),
+ new Field("session_timeout",
+ Type.INT32,
+ "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+ new Field("topics",
+ new ArrayOf(Type.STRING),
+ "An array of topics to subscribe to."),
+ new Field("consumer_id",
+ Type.STRING,
+ "The assigned consumer id or an empty string for a new consumer."),
+ new Field("partition_assignment_strategy",
+ Type.STRING,
+ "The strategy for the coordinator to assign partitions."));
+
+ public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
+ new Field("partitions", new ArrayOf(Type.INT32)));
+ public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16),
+ new Field("group_generation_id",
+ Type.INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ Type.STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("assigned_partitions",
+ new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
+ public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
+
+ /* Heartbeat api */
+ public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", Type.STRING, "The consumer group id."),
+ new Field("group_generation_id",
+ Type.INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ Type.STRING,
+ "The consumer id assigned by the group coordinator."));
+
+ public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16));
+
+ public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
+ public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+
+ /* an array of all requests and responses with all schema versions */
+ public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
+ public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+
+ /* the latest version of each api */
+ public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+
+ static {
+ REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
+ REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
+ REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
+ REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
+ REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+ REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
+ REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+ REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
+ REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
+ REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
+ REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
+ REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
+
+ RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
+ RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
+ RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
+ RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
+ RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
+ RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
+ RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
+ RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
+ RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
+ RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
+ RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
+ RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
+ RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
+
+ /* set the maximum version of each api */
+ for (ApiKeys api : ApiKeys.values())
+ CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
+
+ /* sanity check that we have the same number of request and response versions for each api */
+ for (ApiKeys api : ApiKeys.values())
+ if (REQUESTS[api.id].length != RESPONSES[api.id].length)
+ throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
+ + " but " + RESPONSES[api.id].length + " response versions.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
new file mode 100644
index 0000000..ab5a607
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public enum SecurityProtocol {
+ /** Un-authenticated, non-encrypted channel */
+ PLAINTEXT(0, "PLAINTEXT"),
+ /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
+ TRACE(Short.MAX_VALUE, "TRACE");
+
+ private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL = new HashMap<Short, SecurityProtocol>();
+ private static final List<String> NAMES = new ArrayList<String>();
+
+ static {
+ for (SecurityProtocol proto: SecurityProtocol.values()) {
+ CODE_TO_SECURITY_PROTOCOL.put(proto.id, proto);
+ NAMES.add(proto.name);
+ }
+ }
+
+ /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */
+ public final short id;
+
+ /** Name of the security protocol. This may be used by client configuration. */
+ public final String name;
+
+ private SecurityProtocol(int id, String name) {
+ this.id = (short) id;
+ this.name = name;
+ }
+
+ public static String getName(int id) {
+ return CODE_TO_SECURITY_PROTOCOL.get((short) id).name;
+ }
+
+ public static List<String> getNames() {
+ return NAMES;
+ }
+
+ public static SecurityProtocol forId(Short id) {
+ return CODE_TO_SECURITY_PROTOCOL.get(id);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
new file mode 100644
index 0000000..d2468d8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Represents a type for an array of a particular type
+ */
+public class ArrayOf extends Type {
+
+ private final Type type;
+
+ public ArrayOf(Type type) {
+ this.type = type;
+ }
+
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ Object[] objs = (Object[]) o;
+ int size = objs.length;
+ buffer.putInt(size);
+ for (int i = 0; i < size; i++)
+ type.write(buffer, objs[i]);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ int size = buffer.getInt();
+ Object[] objs = new Object[size];
+ for (int i = 0; i < size; i++)
+ objs[i] = type.read(buffer);
+ return objs;
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ Object[] objs = (Object[]) o;
+ int size = 4;
+ for (int i = 0; i < objs.length; i++)
+ size += type.sizeOf(objs[i]);
+ return size;
+ }
+
+ public Type type() {
+ return type;
+ }
+
+ @Override
+ public String toString() {
+ return "ARRAY(" + type + ")";
+ }
+
+ @Override
+ public Object[] validate(Object item) {
+ try {
+ Object[] array = (Object[]) item;
+ for (int i = 0; i < array.length; i++)
+ type.validate(array[i]);
+ return array;
+ } catch (ClassCastException e) {
+ throw new SchemaException("Not an Object[].");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
new file mode 100644
index 0000000..b7d7720
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A field in a schema
+ */
+public class Field {
+
+ public static final Object NO_DEFAULT = new Object();
+
+ final int index;
+ public final String name;
+ public final Type type;
+ public final Object defaultValue;
+ public final String doc;
+ final Schema schema;
+
+ /**
+ * Create the field.
+ *
+ * @throws SchemaException If the default value is not primitive and the validation fails
+ */
+ public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
+ this.index = index;
+ this.name = name;
+ this.type = type;
+ this.doc = doc;
+ this.defaultValue = defaultValue;
+ this.schema = schema;
+ if (defaultValue != NO_DEFAULT)
+ type.validate(defaultValue);
+ }
+
+ public Field(int index, String name, Type type, String doc, Object defaultValue) {
+ this(index, name, type, doc, defaultValue, null);
+ }
+
+ public Field(String name, Type type, String doc, Object defaultValue) {
+ this(-1, name, type, doc, defaultValue);
+ }
+
+ public Field(String name, Type type, String doc) {
+ this(name, type, doc, NO_DEFAULT);
+ }
+
+ public Field(String name, Type type) {
+ this(name, type, "");
+ }
+
+ public Type type() {
+ return type;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
new file mode 100644
index 0000000..7adac52
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The schema for a compound record definition
+ */
+public class Schema extends Type {
+
+ private final Field[] fields;
+ private final Map<String, Field> fieldsByName;
+
+ /**
+ * Construct the schema with a given list of its field values
+ *
+ * @throws SchemaException If the given list have duplicate fields
+ */
+ public Schema(Field... fs) {
+ this.fields = new Field[fs.length];
+ this.fieldsByName = new HashMap<String, Field>();
+ for (int i = 0; i < this.fields.length; i++) {
+ Field field = fs[i];
+ if (fieldsByName.containsKey(field.name))
+ throw new SchemaException("Schema contains a duplicate field: " + field.name);
+ this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
+ this.fieldsByName.put(fs[i].name, this.fields[i]);
+ }
+ }
+
+ /**
+ * Write a struct to the buffer
+ */
+ public void write(ByteBuffer buffer, Object o) {
+ Struct r = (Struct) o;
+ for (int i = 0; i < fields.length; i++) {
+ Field f = fields[i];
+ try {
+ Object value = f.type().validate(r.get(f));
+ f.type.write(buffer, value);
+ } catch (Exception e) {
+ throw new SchemaException("Error writing field '" + f.name +
+ "': " +
+ (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
+ }
+ }
+ }
+
+ /**
+ * Read a struct from the buffer
+ */
+ public Object read(ByteBuffer buffer) {
+ Object[] objects = new Object[fields.length];
+ for (int i = 0; i < fields.length; i++) {
+ try {
+ objects[i] = fields[i].type.read(buffer);
+ } catch (Exception e) {
+ throw new SchemaException("Error reading field '" + fields[i].name +
+ "': " +
+ (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
+ }
+ }
+ return new Struct(this, objects);
+ }
+
+ /**
+ * The size of the given record
+ */
+ public int sizeOf(Object o) {
+ int size = 0;
+ Struct r = (Struct) o;
+ for (int i = 0; i < fields.length; i++)
+ size += fields[i].type.sizeOf(r.get(fields[i]));
+ return size;
+ }
+
+ /**
+ * The number of fields in this schema
+ */
+ public int numFields() {
+ return this.fields.length;
+ }
+
+ /**
+ * Get a field by its slot in the record array
+ *
+ * @param slot The slot at which this field sits
+ * @return The field
+ */
+ public Field get(int slot) {
+ return this.fields[slot];
+ }
+
+ /**
+ * Get a field by its name
+ *
+ * @param name The name of the field
+ * @return The field
+ */
+ public Field get(String name) {
+ return this.fieldsByName.get(name);
+ }
+
+ /**
+ * Get all the fields in this schema
+ */
+ public Field[] fields() {
+ return this.fields;
+ }
+
+ /**
+ * Display a string representation of the schema
+ */
+ public String toString() {
+ StringBuilder b = new StringBuilder();
+ b.append('{');
+ for (int i = 0; i < this.fields.length; i++) {
+ b.append(this.fields[i].name);
+ b.append(':');
+ b.append(this.fields[i].type());
+ if (i < this.fields.length - 1)
+ b.append(',');
+ }
+ b.append("}");
+ return b.toString();
+ }
+
+ @Override
+ public Struct validate(Object item) {
+ try {
+ Struct struct = (Struct) item;
+ for (int i = 0; i < this.fields.length; i++) {
+ Field field = this.fields[i];
+ try {
+ field.type.validate(struct.get(field));
+ } catch (SchemaException e) {
+ throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
+ }
+ }
+ return struct;
+ } catch (ClassCastException e) {
+ throw new SchemaException("Not a Struct.");
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
new file mode 100644
index 0000000..86c141e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Thrown if the protocol schema validation fails while parsing request or response.
+ */
+public class SchemaException extends KafkaException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SchemaException(String message) {
+ super(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
new file mode 100644
index 0000000..482fe9d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A record that can be serialized and deserialized according to a pre-defined schema
+ */
+public class Struct {
+ private final Schema schema;
+ private final Object[] values;
+
+ Struct(Schema schema, Object[] values) {
+ this.schema = schema;
+ this.values = values;
+ }
+
+ public Struct(Schema schema) {
+ this.schema = schema;
+ this.values = new Object[this.schema.numFields()];
+ }
+
+ /**
+ * The schema for this struct.
+ */
+ public Schema schema() {
+ return this.schema;
+ }
+
+ /**
+ * Return the value of the given pre-validated field, or if the value is missing return the default value.
+ *
+ * @param field The field for which to get the default value
+ * @throws SchemaException if the field has no value and has no default.
+ */
+ private Object getFieldOrDefault(Field field) {
+ Object value = this.values[field.index];
+ if (value != null)
+ return value;
+ else if (field.defaultValue != Field.NO_DEFAULT)
+ return field.defaultValue;
+ else
+ throw new SchemaException("Missing value for field '" + field.name + "' which has no default value.");
+ }
+
+ /**
+ * Get the value for the field directly by the field index with no lookup needed (faster!)
+ *
+ * @param field The field to look up
+ * @return The value for that field.
+ * @throws SchemaException if the field has no value and has no default.
+ */
+ public Object get(Field field) {
+ validateField(field);
+ return getFieldOrDefault(field);
+ }
+
+ /**
+ * Get the record value for the field with the given name by doing a hash table lookup (slower!)
+ *
+ * @param name The name of the field
+ * @return The value in the record
+ * @throws SchemaException If no such field exists
+ */
+ public Object get(String name) {
+ Field field = schema.get(name);
+ if (field == null)
+ throw new SchemaException("No such field: " + name);
+ return getFieldOrDefault(field);
+ }
+
+ /**
+ * Check if the struct contains a field.
+ * @param name
+ * @return Whether a field exists.
+ */
+ public boolean hasField(String name) {
+ return schema.get(name) != null;
+ }
+
+ public Struct getStruct(Field field) {
+ return (Struct) get(field);
+ }
+
+ public Struct getStruct(String name) {
+ return (Struct) get(name);
+ }
+
+ public Short getShort(Field field) {
+ return (Short) get(field);
+ }
+
+ public Short getShort(String name) {
+ return (Short) get(name);
+ }
+
+ public Integer getInt(Field field) {
+ return (Integer) get(field);
+ }
+
+ public Integer getInt(String name) {
+ return (Integer) get(name);
+ }
+
+ public Long getLong(Field field) {
+ return (Long) get(field);
+ }
+
+ public Long getLong(String name) {
+ return (Long) get(name);
+ }
+
+ public Object[] getArray(Field field) {
+ return (Object[]) get(field);
+ }
+
+ public Object[] getArray(String name) {
+ return (Object[]) get(name);
+ }
+
+ public String getString(Field field) {
+ return (String) get(field);
+ }
+
+ public String getString(String name) {
+ return (String) get(name);
+ }
+
+ public ByteBuffer getBytes(Field field) {
+ return (ByteBuffer) get(field);
+ }
+
+ public ByteBuffer getBytes(String name) {
+ return (ByteBuffer) get(name);
+ }
+
+ /**
+ * Set the given field to the specified value
+ *
+ * @param field The field
+ * @param value The value
+ * @throws SchemaException If the validation of the field failed
+ */
+ public Struct set(Field field, Object value) {
+ validateField(field);
+ this.values[field.index] = value;
+ return this;
+ }
+
+ /**
+ * Set the field specified by the given name to the value
+ *
+ * @param name The name of the field
+ * @param value The value to set
+ * @throws SchemaException If the field is not known
+ */
+ public Struct set(String name, Object value) {
+ Field field = this.schema.get(name);
+ if (field == null)
+ throw new SchemaException("Unknown field: " + name);
+ this.values[field.index] = value;
+ return this;
+ }
+
+ /**
+ * Create a struct for the schema of a container type (struct or array). Note that for array type, this method
+ * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
+ * instantiated with this method.
+ *
+ * @param field The field to create an instance of
+ * @return The struct
+ * @throws SchemaException If the given field is not a container type
+ */
+ public Struct instance(Field field) {
+ validateField(field);
+ if (field.type() instanceof Schema) {
+ return new Struct((Schema) field.type());
+ } else if (field.type() instanceof ArrayOf) {
+ ArrayOf array = (ArrayOf) field.type();
+ return new Struct((Schema) array.type());
+ } else {
+ throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type());
+ }
+ }
+
+ /**
+ * Create a struct instance for the given field which must be a container type (struct or array)
+ *
+ * @param field The name of the field to create (field must be a schema type)
+ * @return The struct
+ * @throws SchemaException If the given field is not a container type
+ */
+ public Struct instance(String field) {
+ return instance(schema.get(field));
+ }
+
+ /**
+ * Empty all the values from this record
+ */
+ public void clear() {
+ Arrays.fill(this.values, null);
+ }
+
+ /**
+ * Get the serialized size of this object
+ */
+ public int sizeOf() {
+ return this.schema.sizeOf(this);
+ }
+
+ /**
+ * Write this struct to a buffer
+ */
+ public void writeTo(ByteBuffer buffer) {
+ this.schema.write(buffer, this);
+ }
+
+ /**
+ * Ensure the user doesn't try to access fields from the wrong schema
+ *
+ * @throws SchemaException If validation fails
+ */
+ private void validateField(Field field) {
+ if (this.schema != field.schema)
+ throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance.");
+ if (field.index > values.length)
+ throw new SchemaException("Invalid field index: " + field.index);
+ }
+
+ /**
+ * Validate the contents of this struct against its schema
+ *
+ * @throws SchemaException If validation fails
+ */
+ public void validate() {
+ this.schema.validate(this);
+ }
+
+ /**
+ * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
+ * the struct into multiple ByteBuffers if need be.
+ */
+ public ByteBuffer[] toBytes() {
+ ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
+ writeTo(buffer);
+ return new ByteBuffer[] {buffer};
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder b = new StringBuilder();
+ b.append('{');
+ for (int i = 0; i < this.values.length; i++) {
+ Field f = this.schema.get(i);
+ b.append(f.name);
+ b.append('=');
+ if (f.type() instanceof ArrayOf) {
+ Object[] arrayValue = (Object[]) this.values[i];
+ b.append('[');
+ for (int j = 0; j < arrayValue.length; j++) {
+ b.append(arrayValue[j]);
+ if (j < arrayValue.length - 1)
+ b.append(',');
+ }
+ b.append(']');
+ } else
+ b.append(this.values[i]);
+ if (i < this.values.length - 1)
+ b.append(',');
+ }
+ b.append('}');
+ return b.toString();
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ for (int i = 0; i < this.values.length; i++) {
+ Field f = this.schema.get(i);
+ if (f.type() instanceof ArrayOf) {
+ Object[] arrayObject = (Object[]) this.get(f);
+ for (Object arrayItem: arrayObject)
+ result = prime * result + arrayItem.hashCode();
+ } else {
+ result = prime * result + this.get(f).hashCode();
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ Struct other = (Struct) obj;
+ if (schema != other.schema)
+ return false;
+ for (int i = 0; i < this.values.length; i++) {
+ Field f = this.schema.get(i);
+ Boolean result;
+ if (f.type() instanceof ArrayOf) {
+ result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
+ } else {
+ result = this.get(f).equals(other.get(f));
+ }
+ if (!result)
+ return false;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
new file mode 100644
index 0000000..26bdd2f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.protocol.types;
+
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A serializable type
+ */
+public abstract class Type {
+
+ /**
+ * Write the typed object to the buffer
+ *
+ * @throws SchemaException If the object is not valid for its type
+ */
+ public abstract void write(ByteBuffer buffer, Object o);
+
+ /**
+ * Read the typed object from the buffer
+ *
+ * @throws SchemaException If the object is not valid for its type
+ */
+ public abstract Object read(ByteBuffer buffer);
+
+ /**
+ * Validate the object. If succeeded return its typed object.
+ *
+ * @throws SchemaException If validation failed
+ */
+ public abstract Object validate(Object o);
+
+ /**
+ * Return the size of the object in bytes
+ */
+ public abstract int sizeOf(Object o);
+
+ public static final Type INT8 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.put((Byte) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.get();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 1;
+ }
+
+ @Override
+ public String toString() {
+ return "INT8";
+ }
+
+ @Override
+ public Byte validate(Object item) {
+ if (item instanceof Byte)
+ return (Byte) item;
+ else
+ throw new SchemaException(item + " is not a Byte.");
+ }
+ };
+
+ public static final Type INT16 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.putShort((Short) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.getShort();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 2;
+ }
+
+ @Override
+ public String toString() {
+ return "INT16";
+ }
+
+ @Override
+ public Short validate(Object item) {
+ if (item instanceof Short)
+ return (Short) item;
+ else
+ throw new SchemaException(item + " is not a Short.");
+ }
+ };
+
+ public static final Type INT32 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.putInt((Integer) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.getInt();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 4;
+ }
+
+ @Override
+ public String toString() {
+ return "INT32";
+ }
+
+ @Override
+ public Integer validate(Object item) {
+ if (item instanceof Integer)
+ return (Integer) item;
+ else
+ throw new SchemaException(item + " is not an Integer.");
+ }
+ };
+
+ public static final Type INT64 = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ buffer.putLong((Long) o);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ return buffer.getLong();
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 8;
+ }
+
+ @Override
+ public String toString() {
+ return "INT64";
+ }
+
+ @Override
+ public Long validate(Object item) {
+ if (item instanceof Long)
+ return (Long) item;
+ else
+ throw new SchemaException(item + " is not a Long.");
+ }
+ };
+
+ public static final Type STRING = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ byte[] bytes = Utils.utf8((String) o);
+ if (bytes.length > Short.MAX_VALUE)
+ throw new SchemaException("String is longer than the maximum string length.");
+ buffer.putShort((short) bytes.length);
+ buffer.put(bytes);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ int length = buffer.getShort();
+ byte[] bytes = new byte[length];
+ buffer.get(bytes);
+ return Utils.utf8(bytes);
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ return 2 + Utils.utf8Length((String) o);
+ }
+
+ @Override
+ public String toString() {
+ return "STRING";
+ }
+
+ @Override
+ public String validate(Object item) {
+ if (item instanceof String)
+ return (String) item;
+ else
+ throw new SchemaException(item + " is not a String.");
+ }
+ };
+
+ public static final Type BYTES = new Type() {
+ @Override
+ public void write(ByteBuffer buffer, Object o) {
+ ByteBuffer arg = (ByteBuffer) o;
+ int pos = arg.position();
+ buffer.putInt(arg.remaining());
+ buffer.put(arg);
+ arg.position(pos);
+ }
+
+ @Override
+ public Object read(ByteBuffer buffer) {
+ int size = buffer.getInt();
+ ByteBuffer val = buffer.slice();
+ val.limit(size);
+ buffer.position(buffer.position() + size);
+ return val;
+ }
+
+ @Override
+ public int sizeOf(Object o) {
+ ByteBuffer buffer = (ByteBuffer) o;
+ return 4 + buffer.remaining();
+ }
+
+ @Override
+ public String toString() {
+ return "BYTES";
+ }
+
+ @Override
+ public ByteBuffer validate(Object item) {
+ if (item instanceof ByteBuffer)
+ return (ByteBuffer) item;
+ else
+ throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
new file mode 100644
index 0000000..99a20a3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A byte buffer backed input outputStream
+ */
+public class ByteBufferInputStream extends InputStream {
+
+ private ByteBuffer buffer;
+
+ public ByteBufferInputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public int read() {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+ return buffer.get() & 0xFF;
+ }
+
+ public int read(byte[] bytes, int off, int len) {
+ if (!buffer.hasRemaining()) {
+ return -1;
+ }
+
+ len = Math.min(len, buffer.remaining());
+ buffer.get(bytes, off, len);
+ return len;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
new file mode 100644
index 0000000..a334755
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A byte buffer backed output outputStream
+ */
+public class ByteBufferOutputStream extends OutputStream {
+
+ private static final float REALLOCATION_FACTOR = 1.1f;
+
+ private ByteBuffer buffer;
+
+ public ByteBufferOutputStream(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
+
+ public void write(int b) {
+ if (buffer.remaining() < 1)
+ expandBuffer(buffer.capacity() + 1);
+ buffer.put((byte) b);
+ }
+
+ public void write(byte[] bytes, int off, int len) {
+ if (buffer.remaining() < len)
+ expandBuffer(buffer.capacity() + len);
+ buffer.put(bytes, off, len);
+ }
+
+ public ByteBuffer buffer() {
+ return buffer;
+ }
+
+ private void expandBuffer(int size) {
+ int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
+ ByteBuffer temp = ByteBuffer.allocate(expandSize);
+ temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
+ buffer = temp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
new file mode 100644
index 0000000..9961766
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.record;
+
+// ----------------------------------------------------------------------------
+// This class is copied from the Apache Kafka project.
+//
+// The class is part of a "backport" of the new consumer API, in order to
+// give Flink access to its functionality until the API is properly released.
+//
+// This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The compression type to use
+ */
+public enum CompressionType {
+ NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f);
+
+ public final int id;
+ public final String name;
+ public final float rate;
+
+ private CompressionType(int id, String name, float rate) {
+ this.id = id;
+ this.name = name;
+ this.rate = rate;
+ }
+
+ public static CompressionType forId(int id) {
+ switch (id) {
+ case 0:
+ return NONE;
+ case 1:
+ return GZIP;
+ case 2:
+ return SNAPPY;
+ case 3:
+ return LZ4;
+ default:
+ throw new IllegalArgumentException("Unknown compression type id: " + id);
+ }
+ }
+
+ public static CompressionType forName(String name) {
+ if (NONE.name.equals(name))
+ return NONE;
+ else if (GZIP.name.equals(name))
+ return GZIP;
+ else if (SNAPPY.name.equals(name))
+ return SNAPPY;
+ else if (LZ4.name.equals(name))
+ return LZ4;
+ else
+ throw new IllegalArgumentException("Unknown compression name: " + name);
+ }
+
+}