You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:59 UTC
[42/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
deleted file mode 100644
index f7c8981..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/Protocol.java
+++ /dev/null
@@ -1,474 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-import org.apache.flink.kafka_backport.common.protocol.types.ArrayOf;
-import org.apache.flink.kafka_backport.common.protocol.types.Field;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Type;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class Protocol {
-
- public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", Type.INT16, "The id of the request type."),
- new Field("api_version", Type.INT16, "The version of the API."),
- new Field("correlation_id",
- Type.INT32,
- "A user-supplied integer value that will be passed back with the response"),
- new Field("client_id",
- Type.STRING,
- "A user specified identifier for the client making the request."));
-
- public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
- Type.INT32,
- "The user-supplied value passed in with the request"));
-
- /* Metadata api */
-
- public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
- new ArrayOf(Type.STRING),
- "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
-
- public static final Schema BROKER = new Schema(new Field("node_id", Type.INT32, "The broker id."),
- new Field("host", Type.STRING, "The hostname of the broker."),
- new Field("port",
- Type.INT32,
- "The port on which the broker accepts requests."));
-
- public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
- Type.INT16,
- "The error code for the partition, if any."),
- new Field("partition_id",
- Type.INT32,
- "The id of the partition."),
- new Field("leader",
- Type.INT32,
- "The id of the broker acting as leader for this partition."),
- new Field("replicas",
- new ArrayOf(Type.INT32),
- "The set of all nodes that host this partition."),
- new Field("isr",
- new ArrayOf(Type.INT32),
- "The set of nodes that are in sync with the leader for this partition."));
-
- public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
- Type.INT16,
- "The error code for the given topic."),
- new Field("topic", Type.STRING, "The name of the topic"),
- new Field("partition_metadata",
- new ArrayOf(PARTITION_METADATA_V0),
- "Metadata for each partition of the topic."));
-
- public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
- new ArrayOf(BROKER),
- "Host and port information for all brokers."),
- new Field("topic_metadata",
- new ArrayOf(TOPIC_METADATA_V0)));
-
- public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
- public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
-
- /* Produce api */
-
- public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", Type.STRING),
- new Field("data", new ArrayOf(new Schema(new Field("partition", Type.INT32),
- new Field("record_set", Type.BYTES)))));
-
- public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
- Type.INT16,
- "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
- new Field("timeout", Type.INT32, "The time to await a response in ms."),
- new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
- public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(new Schema(new Field("topic", Type.STRING),
- new Field("partition_responses",
- new ArrayOf(new Schema(new Field("partition",
- Type.INT32),
- new Field("error_code",
- Type.INT16),
- new Field("base_offset",
- Type.INT64))))))));
-
- public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
- public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
-
- /* Offset commit api */
- public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("offset",
- Type.INT64,
- "Message offset to be committed."),
- new Field("metadata",
- Type.STRING,
- "Any associated metadata the client wants to keep."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("offset",
- Type.INT64,
- "Message offset to be committed."),
- new Field("timestamp",
- Type.INT64,
- "Timestamp of the commit"),
- new Field("metadata",
- Type.STRING,
- "Any associated metadata the client wants to keep."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V2 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("offset",
- Type.INT64,
- "Message offset to be committed."),
- new Field("metadata",
- Type.STRING,
- "Any associated metadata the client wants to keep."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- Type.STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
- "Partitions to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
- Type.STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
- "Partitions to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V2 = new Schema(new Field("topic",
- Type.STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V2),
- "Partitions to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
- Type.STRING,
- "The consumer group id."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
- "Topics to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
- Type.STRING,
- "The consumer group id."),
- new Field("group_generation_id",
- Type.INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- Type.STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
- "Topics to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_REQUEST_V2 = new Schema(new Field("group_id",
- Type.STRING,
- "The consumer group id."),
- new Field("group_generation_id",
- Type.INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- Type.STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("retention_time",
- Type.INT64,
- "Time period in ms to retain the offset."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
- "Topics to commit offsets."));
-
- public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("error_code",
- Type.INT16));
-
- public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
-
- public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
- public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2};
-
- /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */
- public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
- public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0;
-
- public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2};
-
- /* Offset fetch api */
-
- /*
- * Wire formats of version 0 and 1 are the same, but with different functionality.
- * Version 0 will read the offsets from ZK;
- * Version 1 will read the offsets from Kafka.
- */
- public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."));
-
- public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- Type.STRING,
- "Topic to fetch offset."),
- new Field("partitions",
- new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch offsets."));
-
- public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
- Type.STRING,
- "The consumer group id."),
- new Field("topics",
- new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch offsets."));
-
- public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("offset",
- Type.INT64,
- "Last committed message offset."),
- new Field("metadata",
- Type.STRING,
- "Any associated metadata the client wants to keep."),
- new Field("error_code", Type.INT16));
-
- public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
-
- public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
-
- public static final Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
- public static final Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
-
- public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1};
- public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1};
-
- /* List offset api */
- public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("timestamp", Type.INT64, "Timestamp."),
- new Field("max_num_offsets",
- Type.INT32,
- "Maximum offsets to return."));
-
- public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- Type.STRING,
- "Topic to list offset."),
- new Field("partitions",
- new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
- "Partitions to list offset."));
-
- public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
- Type.INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("topics",
- new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
- "Topics to list offsets."));
-
- public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("error_code", Type.INT16),
- new Field("offsets",
- new ArrayOf(Type.INT64),
- "A list of offsets."));
-
- public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
- new Field("partition_responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
-
- public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
-
- public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0};
- public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0};
-
- /* Fetch api */
- public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("fetch_offset",
- Type.INT64,
- "Message offset."),
- new Field("max_bytes",
- Type.INT32,
- "Maximum bytes to fetch."));
-
- public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", Type.STRING, "Topic to fetch."),
- new Field("partitions",
- new ArrayOf(FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch."));
-
- public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
- Type.INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("max_wait_time",
- Type.INT32,
- "Maximum time in ms to wait for the response."),
- new Field("min_bytes",
- Type.INT32,
- "Minimum bytes to accumulate in the response."),
- new Field("topics",
- new ArrayOf(FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch."));
-
- public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- Type.INT32,
- "Topic partition id."),
- new Field("error_code", Type.INT16),
- new Field("high_watermark",
- Type.INT64,
- "Last committed offset."),
- new Field("record_set", Type.BYTES));
-
- public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
- new Field("partition_responses",
- new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
-
- public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
-
- public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
- public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
-
- /* Consumer metadata api */
- public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
- Type.STRING,
- "The consumer group id."));
-
- public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16),
- new Field("coordinator",
- BROKER,
- "Host and port information for the coordinator for a consumer group."));
-
- public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
- public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
-
- /* Join group api */
- public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
- Type.STRING,
- "The consumer group id."),
- new Field("session_timeout",
- Type.INT32,
- "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
- new Field("topics",
- new ArrayOf(Type.STRING),
- "An array of topics to subscribe to."),
- new Field("consumer_id",
- Type.STRING,
- "The assigned consumer id or an empty string for a new consumer."),
- new Field("partition_assignment_strategy",
- Type.STRING,
- "The strategy for the coordinator to assign partitions."));
-
- public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", Type.STRING),
- new Field("partitions", new ArrayOf(Type.INT32)));
- public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16),
- new Field("group_generation_id",
- Type.INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- Type.STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("assigned_partitions",
- new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
-
- public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
- public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
-
- /* Heartbeat api */
- public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", Type.STRING, "The consumer group id."),
- new Field("group_generation_id",
- Type.INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- Type.STRING,
- "The consumer id assigned by the group coordinator."));
-
- public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", Type.INT16));
-
- public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
- public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
-
- /* an array of all requests and responses with all schema versions */
- public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
- public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
-
- /* the latest version of each api */
- public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
-
- static {
- REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
- REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST;
- REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST;
- REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST;
- REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
- REQUESTS[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
- REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
- REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
- REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
- REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
- REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST;
- REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
- REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
-
- RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
- RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
- RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE;
- RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE;
- RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {};
- RESPONSES[ApiKeys.STOP_REPLICA.id] = new Schema[] {};
- RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {};
- RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {};
- RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
- RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
- RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE;
- RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
- RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
-
- /* set the maximum version of each api */
- for (ApiKeys api : ApiKeys.values())
- CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1);
-
- /* sanity check that we have the same number of request and response versions for each api */
- for (ApiKeys api : ApiKeys.values())
- if (REQUESTS[api.id].length != RESPONSES[api.id].length)
- throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
- + " but " + RESPONSES[api.id].length + " response versions.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
deleted file mode 100644
index ab5a607..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/SecurityProtocol.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public enum SecurityProtocol {
- /** Un-authenticated, non-encrypted channel */
- PLAINTEXT(0, "PLAINTEXT"),
- /** Currently identical to PLAINTEXT and used for testing only. We may implement extra instrumentation when testing channel code. */
- TRACE(Short.MAX_VALUE, "TRACE");
-
- private static final Map<Short, SecurityProtocol> CODE_TO_SECURITY_PROTOCOL = new HashMap<Short, SecurityProtocol>();
- private static final List<String> NAMES = new ArrayList<String>();
-
- static {
- for (SecurityProtocol proto: SecurityProtocol.values()) {
- CODE_TO_SECURITY_PROTOCOL.put(proto.id, proto);
- NAMES.add(proto.name);
- }
- }
-
- /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */
- public final short id;
-
- /** Name of the security protocol. This may be used by client configuration. */
- public final String name;
-
- private SecurityProtocol(int id, String name) {
- this.id = (short) id;
- this.name = name;
- }
-
- public static String getName(int id) {
- return CODE_TO_SECURITY_PROTOCOL.get((short) id).name;
- }
-
- public static List<String> getNames() {
- return NAMES;
- }
-
- public static SecurityProtocol forId(Short id) {
- return CODE_TO_SECURITY_PROTOCOL.get(id);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
deleted file mode 100644
index d2468d8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/ArrayOf.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol.types;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Represents a type for an array of a particular type
- */
-public class ArrayOf extends Type {
-
- private final Type type;
-
- public ArrayOf(Type type) {
- this.type = type;
- }
-
- @Override
- public void write(ByteBuffer buffer, Object o) {
- Object[] objs = (Object[]) o;
- int size = objs.length;
- buffer.putInt(size);
- for (int i = 0; i < size; i++)
- type.write(buffer, objs[i]);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- int size = buffer.getInt();
- Object[] objs = new Object[size];
- for (int i = 0; i < size; i++)
- objs[i] = type.read(buffer);
- return objs;
- }
-
- @Override
- public int sizeOf(Object o) {
- Object[] objs = (Object[]) o;
- int size = 4;
- for (int i = 0; i < objs.length; i++)
- size += type.sizeOf(objs[i]);
- return size;
- }
-
- public Type type() {
- return type;
- }
-
- @Override
- public String toString() {
- return "ARRAY(" + type + ")";
- }
-
- @Override
- public Object[] validate(Object item) {
- try {
- Object[] array = (Object[]) item;
- for (int i = 0; i < array.length; i++)
- type.validate(array[i]);
- return array;
- } catch (ClassCastException e) {
- throw new SchemaException("Not an Object[].");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
deleted file mode 100644
index b7d7720..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Field.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol.types;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A field in a schema
- */
-public class Field {
-
- public static final Object NO_DEFAULT = new Object();
-
- final int index;
- public final String name;
- public final Type type;
- public final Object defaultValue;
- public final String doc;
- final Schema schema;
-
- /**
- * Create the field.
- *
- * @throws SchemaException If the default value is not primitive and the validation fails
- */
- public Field(int index, String name, Type type, String doc, Object defaultValue, Schema schema) {
- this.index = index;
- this.name = name;
- this.type = type;
- this.doc = doc;
- this.defaultValue = defaultValue;
- this.schema = schema;
- if (defaultValue != NO_DEFAULT)
- type.validate(defaultValue);
- }
-
- public Field(int index, String name, Type type, String doc, Object defaultValue) {
- this(index, name, type, doc, defaultValue, null);
- }
-
- public Field(String name, Type type, String doc, Object defaultValue) {
- this(-1, name, type, doc, defaultValue);
- }
-
- public Field(String name, Type type, String doc) {
- this(name, type, doc, NO_DEFAULT);
- }
-
- public Field(String name, Type type) {
- this(name, type, "");
- }
-
- public Type type() {
- return type;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
deleted file mode 100644
index 7adac52..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Schema.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol.types;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The schema for a compound record definition
- */
-public class Schema extends Type {
-
- private final Field[] fields;
- private final Map<String, Field> fieldsByName;
-
- /**
- * Construct the schema with a given list of its field values
- *
- * @throws SchemaException If the given list have duplicate fields
- */
- public Schema(Field... fs) {
- this.fields = new Field[fs.length];
- this.fieldsByName = new HashMap<String, Field>();
- for (int i = 0; i < this.fields.length; i++) {
- Field field = fs[i];
- if (fieldsByName.containsKey(field.name))
- throw new SchemaException("Schema contains a duplicate field: " + field.name);
- this.fields[i] = new Field(i, field.name, field.type, field.doc, field.defaultValue, this);
- this.fieldsByName.put(fs[i].name, this.fields[i]);
- }
- }
-
- /**
- * Write a struct to the buffer
- */
- public void write(ByteBuffer buffer, Object o) {
- Struct r = (Struct) o;
- for (int i = 0; i < fields.length; i++) {
- Field f = fields[i];
- try {
- Object value = f.type().validate(r.get(f));
- f.type.write(buffer, value);
- } catch (Exception e) {
- throw new SchemaException("Error writing field '" + f.name +
- "': " +
- (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
- }
- }
- }
-
- /**
- * Read a struct from the buffer
- */
- public Object read(ByteBuffer buffer) {
- Object[] objects = new Object[fields.length];
- for (int i = 0; i < fields.length; i++) {
- try {
- objects[i] = fields[i].type.read(buffer);
- } catch (Exception e) {
- throw new SchemaException("Error reading field '" + fields[i].name +
- "': " +
- (e.getMessage() == null ? e.getClass().getName() : e.getMessage()));
- }
- }
- return new Struct(this, objects);
- }
-
- /**
- * The size of the given record
- */
- public int sizeOf(Object o) {
- int size = 0;
- Struct r = (Struct) o;
- for (int i = 0; i < fields.length; i++)
- size += fields[i].type.sizeOf(r.get(fields[i]));
- return size;
- }
-
- /**
- * The number of fields in this schema
- */
- public int numFields() {
- return this.fields.length;
- }
-
- /**
- * Get a field by its slot in the record array
- *
- * @param slot The slot at which this field sits
- * @return The field
- */
- public Field get(int slot) {
- return this.fields[slot];
- }
-
- /**
- * Get a field by its name
- *
- * @param name The name of the field
- * @return The field
- */
- public Field get(String name) {
- return this.fieldsByName.get(name);
- }
-
- /**
- * Get all the fields in this schema
- */
- public Field[] fields() {
- return this.fields;
- }
-
- /**
- * Display a string representation of the schema
- */
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append('{');
- for (int i = 0; i < this.fields.length; i++) {
- b.append(this.fields[i].name);
- b.append(':');
- b.append(this.fields[i].type());
- if (i < this.fields.length - 1)
- b.append(',');
- }
- b.append("}");
- return b.toString();
- }
-
- @Override
- public Struct validate(Object item) {
- try {
- Struct struct = (Struct) item;
- for (int i = 0; i < this.fields.length; i++) {
- Field field = this.fields[i];
- try {
- field.type.validate(struct.get(field));
- } catch (SchemaException e) {
- throw new SchemaException("Invalid value for field '" + field.name + "': " + e.getMessage());
- }
- }
- return struct;
- } catch (ClassCastException e) {
- throw new SchemaException("Not a Struct.");
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
deleted file mode 100644
index 86c141e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/SchemaException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol.types;
-
-import org.apache.flink.kafka_backport.common.KafkaException;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * Thrown if the protocol schema validation fails while parsing request or response.
- */
-public class SchemaException extends KafkaException {
-
- private static final long serialVersionUID = 1L;
-
- public SchemaException(String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
deleted file mode 100644
index 482fe9d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Struct.java
+++ /dev/null
@@ -1,338 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol.types;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A record that can be serialized and deserialized according to a pre-defined schema
- */
-public class Struct {
- private final Schema schema;
- private final Object[] values;
-
- Struct(Schema schema, Object[] values) {
- this.schema = schema;
- this.values = values;
- }
-
- public Struct(Schema schema) {
- this.schema = schema;
- this.values = new Object[this.schema.numFields()];
- }
-
- /**
- * The schema for this struct.
- */
- public Schema schema() {
- return this.schema;
- }
-
- /**
- * Return the value of the given pre-validated field, or if the value is missing return the default value.
- *
- * @param field The field for which to get the default value
- * @throws SchemaException if the field has no value and has no default.
- */
- private Object getFieldOrDefault(Field field) {
- Object value = this.values[field.index];
- if (value != null)
- return value;
- else if (field.defaultValue != Field.NO_DEFAULT)
- return field.defaultValue;
- else
- throw new SchemaException("Missing value for field '" + field.name + "' which has no default value.");
- }
-
- /**
- * Get the value for the field directly by the field index with no lookup needed (faster!)
- *
- * @param field The field to look up
- * @return The value for that field.
- * @throws SchemaException if the field has no value and has no default.
- */
- public Object get(Field field) {
- validateField(field);
- return getFieldOrDefault(field);
- }
-
- /**
- * Get the record value for the field with the given name by doing a hash table lookup (slower!)
- *
- * @param name The name of the field
- * @return The value in the record
- * @throws SchemaException If no such field exists
- */
- public Object get(String name) {
- Field field = schema.get(name);
- if (field == null)
- throw new SchemaException("No such field: " + name);
- return getFieldOrDefault(field);
- }
-
- /**
- * Check if the struct contains a field.
- * @param name
- * @return Whether a field exists.
- */
- public boolean hasField(String name) {
- return schema.get(name) != null;
- }
-
- public Struct getStruct(Field field) {
- return (Struct) get(field);
- }
-
- public Struct getStruct(String name) {
- return (Struct) get(name);
- }
-
- public Short getShort(Field field) {
- return (Short) get(field);
- }
-
- public Short getShort(String name) {
- return (Short) get(name);
- }
-
- public Integer getInt(Field field) {
- return (Integer) get(field);
- }
-
- public Integer getInt(String name) {
- return (Integer) get(name);
- }
-
- public Long getLong(Field field) {
- return (Long) get(field);
- }
-
- public Long getLong(String name) {
- return (Long) get(name);
- }
-
- public Object[] getArray(Field field) {
- return (Object[]) get(field);
- }
-
- public Object[] getArray(String name) {
- return (Object[]) get(name);
- }
-
- public String getString(Field field) {
- return (String) get(field);
- }
-
- public String getString(String name) {
- return (String) get(name);
- }
-
- public ByteBuffer getBytes(Field field) {
- return (ByteBuffer) get(field);
- }
-
- public ByteBuffer getBytes(String name) {
- return (ByteBuffer) get(name);
- }
-
- /**
- * Set the given field to the specified value
- *
- * @param field The field
- * @param value The value
- * @throws SchemaException If the validation of the field failed
- */
- public Struct set(Field field, Object value) {
- validateField(field);
- this.values[field.index] = value;
- return this;
- }
-
- /**
- * Set the field specified by the given name to the value
- *
- * @param name The name of the field
- * @param value The value to set
- * @throws SchemaException If the field is not known
- */
- public Struct set(String name, Object value) {
- Field field = this.schema.get(name);
- if (field == null)
- throw new SchemaException("Unknown field: " + name);
- this.values[field.index] = value;
- return this;
- }
-
- /**
- * Create a struct for the schema of a container type (struct or array). Note that for array type, this method
- * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
- * instantiated with this method.
- *
- * @param field The field to create an instance of
- * @return The struct
- * @throws SchemaException If the given field is not a container type
- */
- public Struct instance(Field field) {
- validateField(field);
- if (field.type() instanceof Schema) {
- return new Struct((Schema) field.type());
- } else if (field.type() instanceof ArrayOf) {
- ArrayOf array = (ArrayOf) field.type();
- return new Struct((Schema) array.type());
- } else {
- throw new SchemaException("Field '" + field.name + "' is not a container type, it is of type " + field.type());
- }
- }
-
- /**
- * Create a struct instance for the given field which must be a container type (struct or array)
- *
- * @param field The name of the field to create (field must be a schema type)
- * @return The struct
- * @throws SchemaException If the given field is not a container type
- */
- public Struct instance(String field) {
- return instance(schema.get(field));
- }
-
- /**
- * Empty all the values from this record
- */
- public void clear() {
- Arrays.fill(this.values, null);
- }
-
- /**
- * Get the serialized size of this object
- */
- public int sizeOf() {
- return this.schema.sizeOf(this);
- }
-
- /**
- * Write this struct to a buffer
- */
- public void writeTo(ByteBuffer buffer) {
- this.schema.write(buffer, this);
- }
-
- /**
- * Ensure the user doesn't try to access fields from the wrong schema
- *
- * @throws SchemaException If validation fails
- */
- private void validateField(Field field) {
- if (this.schema != field.schema)
- throw new SchemaException("Attempt to access field '" + field.name + "' from a different schema instance.");
- if (field.index > values.length)
- throw new SchemaException("Invalid field index: " + field.index);
- }
-
- /**
- * Validate the contents of this struct against its schema
- *
- * @throws SchemaException If validation fails
- */
- public void validate() {
- this.schema.validate(this);
- }
-
- /**
- * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break
- * the struct into multiple ByteBuffers if need be.
- */
- public ByteBuffer[] toBytes() {
- ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
- writeTo(buffer);
- return new ByteBuffer[] {buffer};
- }
-
- @Override
- public String toString() {
- StringBuilder b = new StringBuilder();
- b.append('{');
- for (int i = 0; i < this.values.length; i++) {
- Field f = this.schema.get(i);
- b.append(f.name);
- b.append('=');
- if (f.type() instanceof ArrayOf) {
- Object[] arrayValue = (Object[]) this.values[i];
- b.append('[');
- for (int j = 0; j < arrayValue.length; j++) {
- b.append(arrayValue[j]);
- if (j < arrayValue.length - 1)
- b.append(',');
- }
- b.append(']');
- } else
- b.append(this.values[i]);
- if (i < this.values.length - 1)
- b.append(',');
- }
- b.append('}');
- return b.toString();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- for (int i = 0; i < this.values.length; i++) {
- Field f = this.schema.get(i);
- if (f.type() instanceof ArrayOf) {
- Object[] arrayObject = (Object[]) this.get(f);
- for (Object arrayItem: arrayObject)
- result = prime * result + arrayItem.hashCode();
- } else {
- result = prime * result + this.get(f).hashCode();
- }
- }
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Struct other = (Struct) obj;
- if (schema != other.schema)
- return false;
- for (int i = 0; i < this.values.length; i++) {
- Field f = this.schema.get(i);
- Boolean result;
- if (f.type() instanceof ArrayOf) {
- result = Arrays.equals((Object[]) this.get(f), (Object[]) other.get(f));
- } else {
- result = this.get(f).equals(other.get(f));
- }
- if (!result)
- return false;
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
deleted file mode 100644
index 26bdd2f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/protocol/types/Type.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.protocol.types;
-
-import org.apache.flink.kafka_backport.common.utils.Utils;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A serializable type
- */
-public abstract class Type {
-
- /**
- * Write the typed object to the buffer
- *
- * @throws SchemaException If the object is not valid for its type
- */
- public abstract void write(ByteBuffer buffer, Object o);
-
- /**
- * Read the typed object from the buffer
- *
- * @throws SchemaException If the object is not valid for its type
- */
- public abstract Object read(ByteBuffer buffer);
-
- /**
- * Validate the object. If succeeded return its typed object.
- *
- * @throws SchemaException If validation failed
- */
- public abstract Object validate(Object o);
-
- /**
- * Return the size of the object in bytes
- */
- public abstract int sizeOf(Object o);
-
- public static final Type INT8 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.put((Byte) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.get();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 1;
- }
-
- @Override
- public String toString() {
- return "INT8";
- }
-
- @Override
- public Byte validate(Object item) {
- if (item instanceof Byte)
- return (Byte) item;
- else
- throw new SchemaException(item + " is not a Byte.");
- }
- };
-
- public static final Type INT16 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.putShort((Short) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.getShort();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 2;
- }
-
- @Override
- public String toString() {
- return "INT16";
- }
-
- @Override
- public Short validate(Object item) {
- if (item instanceof Short)
- return (Short) item;
- else
- throw new SchemaException(item + " is not a Short.");
- }
- };
-
- public static final Type INT32 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.putInt((Integer) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.getInt();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 4;
- }
-
- @Override
- public String toString() {
- return "INT32";
- }
-
- @Override
- public Integer validate(Object item) {
- if (item instanceof Integer)
- return (Integer) item;
- else
- throw new SchemaException(item + " is not an Integer.");
- }
- };
-
- public static final Type INT64 = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- buffer.putLong((Long) o);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- return buffer.getLong();
- }
-
- @Override
- public int sizeOf(Object o) {
- return 8;
- }
-
- @Override
- public String toString() {
- return "INT64";
- }
-
- @Override
- public Long validate(Object item) {
- if (item instanceof Long)
- return (Long) item;
- else
- throw new SchemaException(item + " is not a Long.");
- }
- };
-
- public static final Type STRING = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- byte[] bytes = Utils.utf8((String) o);
- if (bytes.length > Short.MAX_VALUE)
- throw new SchemaException("String is longer than the maximum string length.");
- buffer.putShort((short) bytes.length);
- buffer.put(bytes);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- int length = buffer.getShort();
- byte[] bytes = new byte[length];
- buffer.get(bytes);
- return Utils.utf8(bytes);
- }
-
- @Override
- public int sizeOf(Object o) {
- return 2 + Utils.utf8Length((String) o);
- }
-
- @Override
- public String toString() {
- return "STRING";
- }
-
- @Override
- public String validate(Object item) {
- if (item instanceof String)
- return (String) item;
- else
- throw new SchemaException(item + " is not a String.");
- }
- };
-
- public static final Type BYTES = new Type() {
- @Override
- public void write(ByteBuffer buffer, Object o) {
- ByteBuffer arg = (ByteBuffer) o;
- int pos = arg.position();
- buffer.putInt(arg.remaining());
- buffer.put(arg);
- arg.position(pos);
- }
-
- @Override
- public Object read(ByteBuffer buffer) {
- int size = buffer.getInt();
- ByteBuffer val = buffer.slice();
- val.limit(size);
- buffer.position(buffer.position() + size);
- return val;
- }
-
- @Override
- public int sizeOf(Object o) {
- ByteBuffer buffer = (ByteBuffer) o;
- return 4 + buffer.remaining();
- }
-
- @Override
- public String toString() {
- return "BYTES";
- }
-
- @Override
- public ByteBuffer validate(Object item) {
- if (item instanceof ByteBuffer)
- return (ByteBuffer) item;
- else
- throw new SchemaException(item + " is not a java.nio.ByteBuffer.");
- }
- };
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
deleted file mode 100644
index 99a20a3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferInputStream.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.record;
-
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A byte buffer backed input outputStream
- */
-public class ByteBufferInputStream extends InputStream {
-
- private ByteBuffer buffer;
-
- public ByteBufferInputStream(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- public int read() {
- if (!buffer.hasRemaining()) {
- return -1;
- }
- return buffer.get() & 0xFF;
- }
-
- public int read(byte[] bytes, int off, int len) {
- if (!buffer.hasRemaining()) {
- return -1;
- }
-
- len = Math.min(len, buffer.remaining());
- buffer.get(bytes, off, len);
- return len;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
deleted file mode 100644
index a334755..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/ByteBufferOutputStream.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.record;
-
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * A byte buffer backed output outputStream
- */
-public class ByteBufferOutputStream extends OutputStream {
-
- private static final float REALLOCATION_FACTOR = 1.1f;
-
- private ByteBuffer buffer;
-
- public ByteBufferOutputStream(ByteBuffer buffer) {
- this.buffer = buffer;
- }
-
- public void write(int b) {
- if (buffer.remaining() < 1)
- expandBuffer(buffer.capacity() + 1);
- buffer.put((byte) b);
- }
-
- public void write(byte[] bytes, int off, int len) {
- if (buffer.remaining() < len)
- expandBuffer(buffer.capacity() + len);
- buffer.put(bytes, off, len);
- }
-
- public ByteBuffer buffer() {
- return buffer;
- }
-
- private void expandBuffer(int size) {
- int expandSize = Math.max((int) (buffer.capacity() * REALLOCATION_FACTOR), size);
- ByteBuffer temp = ByteBuffer.allocate(expandSize);
- temp.put(buffer.array(), buffer.arrayOffset(), buffer.position());
- buffer = temp;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
deleted file mode 100644
index 9961766..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/record/CompressionType.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.record;
-
-// ----------------------------------------------------------------------------
-// This class is copied from the Apache Kafka project.
-//
-// The class is part of a "backport" of the new consumer API, in order to
-// give Flink access to its functionality until the API is properly released.
-//
-// This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * The compression type to use
- */
-public enum CompressionType {
- NONE(0, "none", 1.0f), GZIP(1, "gzip", 0.5f), SNAPPY(2, "snappy", 0.5f), LZ4(3, "lz4", 0.5f);
-
- public final int id;
- public final String name;
- public final float rate;
-
- private CompressionType(int id, String name, float rate) {
- this.id = id;
- this.name = name;
- this.rate = rate;
- }
-
- public static CompressionType forId(int id) {
- switch (id) {
- case 0:
- return NONE;
- case 1:
- return GZIP;
- case 2:
- return SNAPPY;
- case 3:
- return LZ4;
- default:
- throw new IllegalArgumentException("Unknown compression type id: " + id);
- }
- }
-
- public static CompressionType forName(String name) {
- if (NONE.name.equals(name))
- return NONE;
- else if (GZIP.name.equals(name))
- return GZIP;
- else if (SNAPPY.name.equals(name))
- return SNAPPY;
- else if (LZ4.name.equals(name))
- return LZ4;
- else
- throw new IllegalArgumentException("Unknown compression name: " + name);
- }
-
-}