You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/02/04 05:58:43 UTC
[4/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..101f382 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -28,346 +28,347 @@ import org.apache.kafka.common.protocol.types.Schema;
public class Protocol {
- public static Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
- new Field("api_version", INT16, "The version of the API."),
- new Field("correlation_id",
- INT32,
- "A user-supplied integer value that will be passed back with the response"),
- new Field("client_id",
- STRING,
- "A user specified identifier for the client making the request."));
-
- public static Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
- INT32,
- "The user-supplied value passed in with the request"));
+ public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
+ new Field("api_version", INT16, "The version of the API."),
+ new Field("correlation_id",
+ INT32,
+ "A user-supplied integer value that will be passed back with the response"),
+ new Field("client_id",
+ STRING,
+ "A user specified identifier for the client making the request."));
+
+ public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
+ INT32,
+ "The user-supplied value passed in with the request"));
/* Metadata api */
- public static Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
- new ArrayOf(STRING),
- "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+ public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
+ new ArrayOf(STRING),
+ "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
- public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
- new Field("host", STRING, "The hostname of the broker."),
- new Field("port", INT32, "The port on which the broker accepts requests."));
+ public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
+ new Field("host", STRING, "The hostname of the broker."),
+ new Field("port",
+ INT32,
+ "The port on which the broker accepts requests."));
- public static Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
- INT16,
- "The error code for the partition, if any."),
- new Field("partition_id", INT32, "The id of the partition."),
- new Field("leader",
- INT32,
- "The id of the broker acting as leader for this partition."),
- new Field("replicas",
- new ArrayOf(INT32),
- "The set of all nodes that host this partition."),
- new Field("isr",
- new ArrayOf(INT32),
- "The set of nodes that are in sync with the leader for this partition."));
-
- public static Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
- new Field("topic", STRING, "The name of the topic"),
- new Field("partition_metadata",
- new ArrayOf(PARTITION_METADATA_V0),
- "Metadata for each partition of the topic."));
-
- public static Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
- new ArrayOf(BROKER),
- "Host and port information for all brokers."),
- new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0)));
-
- public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 };
- public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 };
+ public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
+ INT16,
+ "The error code for the partition, if any."),
+ new Field("partition_id",
+ INT32,
+ "The id of the partition."),
+ new Field("leader",
+ INT32,
+ "The id of the broker acting as leader for this partition."),
+ new Field("replicas",
+ new ArrayOf(INT32),
+ "The set of all nodes that host this partition."),
+ new Field("isr",
+ new ArrayOf(INT32),
+ "The set of nodes that are in sync with the leader for this partition."));
+
+ public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
+ INT16,
+ "The error code for the given topic."),
+ new Field("topic", STRING, "The name of the topic"),
+ new Field("partition_metadata",
+ new ArrayOf(PARTITION_METADATA_V0),
+ "Metadata for each partition of the topic."));
+
+ public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
+ new ArrayOf(BROKER),
+ "Host and port information for all brokers."),
+ new Field("topic_metadata",
+ new ArrayOf(TOPIC_METADATA_V0)));
+
+ public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
+ public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
/* Produce api */
- public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
- new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
- new Field("record_set", BYTES)))));
-
- public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
- INT16,
- "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
- new Field("timeout", INT32, "The time to await a response in ms."),
- new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
- public static Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(new Schema(new Field("partition",
- INT32),
- new Field("error_code",
- INT16),
- new Field("base_offset",
- INT64))))))));
-
- public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
- public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
-
- /* Offset commit api */
- public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Message offset to be committed."),
- new Field("timestamp",
- INT64,
- "Timestamp of the commit"),
- new Field("metadata",
- STRING,
- "Any associated metadata the client wants to keep."));
-
- public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to commit."),
- new Field("partitions",
- new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
- "Partitions to commit offsets."));
-
- public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
- "Topics to commit offsets."));
-
- public static Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("group_generation_id",
+ public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
+ new Field("data",
+ new ArrayOf(new Schema(new Field("partition",
+ INT32),
+ new Field("record_set",
+ BYTES)))));
+
+ public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
+ INT16,
+ "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+ new Field("timeout",
INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("topics",
- new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
- "Topics to commit offsets."));
-
- public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code",
- INT16));
-
- public static Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
-
- public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
- public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
- /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
- public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+ "The time to await a response in ms."),
+ new Field("topic_data",
+ new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+ public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(new Schema(new Field("topic",
+ STRING),
+ new Field("partition_responses",
+ new ArrayOf(new Schema(new Field("partition",
+ INT32),
+ new Field("error_code",
+ INT16),
+ new Field("base_offset",
+ INT64))))))));
+
+ public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
+ public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
- /* Offset fetch api */
- public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."));
+ /* Offset commit api */
+ public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("offset",
+ INT64,
+ "Message offset to be committed."),
+ new Field("timestamp",
+ INT64,
+ "Timestamp of the commit"),
+ new Field("metadata",
+ STRING,
+ "Any associated metadata the client wants to keep."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to commit."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
+ "Partitions to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ "Topics to commit offsets."));
- public static Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
STRING,
- "Topic to fetch offset."),
- new Field("partitions",
- new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch offsets."));
-
- public static Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("topics",
- new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch offsets."));
-
- public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("offset",
- INT64,
- "Last committed message offset."),
- new Field("metadata",
- STRING,
- "Any associated metadata the client wants to keep."),
- new Field("error_code",
- INT16));
+ "The consumer group id."),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("topics",
+ new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+ "Topics to commit offsets."));
+
+ public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code", INT16));
- public static Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+ public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
- public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+ public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
- public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
- public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
+ public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1};
+ /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
+ public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+
+ /* Offset fetch api */
+ public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."));
+
+ public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to fetch offset."),
+ new Field("partitions",
+ new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch offsets."));
+
+ public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("topics",
+ new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch offsets."));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("offset",
+ INT64,
+ "Last committed message offset."),
+ new Field("metadata",
+ STRING,
+ "Any associated metadata the client wants to keep."),
+ new Field("error_code", INT16));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+ public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0};
+ public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0};
/* List offset api */
- public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("timestamp", INT64, "Timestamp."),
+ new Field("max_num_offsets",
+ INT32,
+ "Maximum offsets to return."));
+
+ public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+ STRING,
+ "Topic to list offset."),
+ new Field("partitions",
+ new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
+ "Partitions to list offset."));
+
+ public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+ INT32,
+ "Broker id of the follower. For normal consumers, use -1."),
+ new Field("topics",
+ new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
+ "Topics to list offsets."));
+
+ public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ INT32,
+ "Topic partition id."),
+ new Field("error_code", INT16),
+ new Field("offsets",
+ new ArrayOf(INT64),
+ "A list of offsets."));
+
+ public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partition_responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+ public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+ public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0};
+ public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0};
+
+ /* Fetch api */
+ public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
- new Field("timestamp",
+ new Field("fetch_offset",
INT64,
- "Timestamp."),
- new Field("max_num_offsets",
+ "Message offset."),
+ new Field("max_bytes",
INT32,
- "Maximum offsets to return."));
+ "Maximum bytes to fetch."));
- public static Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to list offset."),
+ public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
new Field("partitions",
- new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
- "Partitions to list offset."));
+ new ArrayOf(FETCH_REQUEST_PARTITION_V0),
+ "Partitions to fetch."));
- public static Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+ public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
INT32,
"Broker id of the follower. For normal consumers, use -1."),
+ new Field("max_wait_time",
+ INT32,
+ "Maximum time in ms to wait for the response."),
+ new Field("min_bytes",
+ INT32,
+ "Minimum bytes to accumulate in the response."),
new Field("topics",
- new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
- "Topics to list offsets."));
+ new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+ "Topics to fetch."));
- public static Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+ public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
INT32,
"Topic partition id."),
- new Field("error_code",
- INT16),
- new Field("offsets",
- new ArrayOf(INT64),
- "A list of offsets."));
+ new Field("error_code", INT16),
+ new Field("high_watermark",
+ INT64,
+ "Last committed offset."),
+ new Field("record_set", BYTES));
- public static Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
new Field("partition_responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
-
- public static Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
-
- public static Schema[] LIST_OFFSET_REQUEST = new Schema[] { LIST_OFFSET_REQUEST_V0 };
- public static Schema[] LIST_OFFSET_RESPONSE = new Schema[] { LIST_OFFSET_RESPONSE_V0 };
-
- /* Fetch api */
- public static Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("fetch_offset",
- INT64,
- "Message offset."),
- new Field("max_bytes",
- INT32,
- "Maximum bytes to fetch."));
-
- public static Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
- STRING,
- "Topic to fetch."),
- new Field("partitions",
- new ArrayOf(FETCH_REQUEST_PARTITION_V0),
- "Partitions to fetch."));
-
- public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
- INT32,
- "Broker id of the follower. For normal consumers, use -1."),
- new Field("max_wait_time",
- INT32,
- "Maximum time in ms to wait for the response."),
- new Field("min_bytes",
- INT32,
- "Minimum bytes to accumulate in the response."),
- new Field("topics",
- new ArrayOf(FETCH_REQUEST_TOPIC_V0),
- "Topics to fetch."));
-
- public static Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
- INT32,
- "Topic partition id."),
- new Field("error_code",
- INT16),
- new Field("high_watermark",
- INT64,
- "Last committed offset."),
- new Field("record_set", BYTES));
-
- public static Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partition_responses",
- new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+ new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
- public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
- new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+ public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+ new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
- public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 };
- public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 };
+ public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
+ public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
/* Consumer metadata api */
- public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."));
+ public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."));
- public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code",
- INT16),
- new Field("coordinator",
- BROKER,
- "Host and port information for the coordinator for a consumer group."));
+ public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("coordinator",
+ BROKER,
+ "Host and port information for the coordinator for a consumer group."));
- public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 };
- public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 };
+ public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
+ public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
/* Join group api */
- public static Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("session_timeout",
- INT32,
- "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
- new Field("topics",
- new ArrayOf(STRING),
- "An array of topics to subscribe to."),
- new Field("consumer_id",
- STRING,
- "The assigned consumer id or an empty string for a new consumer."),
- new Field("partition_assignment_strategy",
- STRING,
- "The strategy for the coordinator to assign partitions."));
-
- public static Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
- new Field("partitions", new ArrayOf(INT32)));
- public static Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code",
- INT16),
- new Field("group_generation_id",
- INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- STRING,
- "The consumer id assigned by the group coordinator."),
- new Field("assigned_partitions",
- new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+ public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
+ STRING,
+ "The consumer group id."),
+ new Field("session_timeout",
+ INT32,
+ "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+ new Field("topics",
+ new ArrayOf(STRING),
+ "An array of topics to subscribe to."),
+ new Field("consumer_id",
+ STRING,
+ "The assigned consumer id or an empty string for a new consumer."),
+ new Field("partition_assignment_strategy",
+ STRING,
+ "The strategy for the coordinator to assign partitions."));
+
+ public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+ new Field("partitions", new ArrayOf(INT32)));
+ public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."),
+ new Field("assigned_partitions",
+ new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
- public static Schema[] JOIN_GROUP_REQUEST = new Schema[] { JOIN_GROUP_REQUEST_V0 };
- public static Schema[] JOIN_GROUP_RESPONSE = new Schema[] { JOIN_GROUP_RESPONSE_V0 };
+ public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
+ public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
/* Heartbeat api */
- public static Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id",
- STRING,
- "The consumer group id."),
- new Field("group_generation_id",
- INT32,
- "The generation of the consumer group."),
- new Field("consumer_id",
- STRING,
- "The consumer id assigned by the group coordinator."));
+ public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
+ new Field("group_generation_id",
+ INT32,
+ "The generation of the consumer group."),
+ new Field("consumer_id",
+ STRING,
+ "The consumer id assigned by the group coordinator."));
- public static Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code",
- INT16));
+ public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
- public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
- public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+ public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
+ public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
/* an array of all requests and responses with all schema versions */
- public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
- public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+ public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
+ public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
/* the latest version of each api */
- public static short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+ public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
static {
REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
@@ -401,11 +402,8 @@ public class Protocol {
/* sanity check that we have the same number of request and response versions for each api */
for (ApiKeys api : ApiKeys.values())
if (REQUESTS[api.id].length != RESPONSES[api.id].length)
- throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api "
- + api.name
- + " but "
- + RESPONSES[api.id].length
- + " response versions.");
+ throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
+ + " but " + RESPONSES[api.id].length + " response versions.");
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ee1f78f..ff89f0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -245,7 +245,7 @@ public class Struct {
public ByteBuffer[] toBytes() {
ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
writeTo(buffer);
- return new ByteBuffer[] { buffer };
+ return new ByteBuffer[] {buffer};
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
index c7bd2f8..1c9fbaa 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
*/
public class ByteBufferOutputStream extends OutputStream {
- private static float REALLOCATION_FACTOR = 1.1f;
+ private static final float REALLOCATION_FACTOR = 1.1f;
private ByteBuffer buffer;
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index d684e68..e570b29 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -34,16 +34,15 @@ public class Compressor {
static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
- private static float[] typeToRate;
- private static int MAX_TYPE_ID = -1;
+ private static final float[] TYPE_TO_RATE;
static {
+ int maxTypeId = -1;
+ for (CompressionType type : CompressionType.values())
+ maxTypeId = Math.max(maxTypeId, type.id);
+ TYPE_TO_RATE = new float[maxTypeId + 1];
for (CompressionType type : CompressionType.values()) {
- MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id);
- }
- typeToRate = new float[MAX_TYPE_ID+1];
- for (CompressionType type : CompressionType.values()) {
- typeToRate[type.id] = type.rate;
+ TYPE_TO_RATE[type.id] = type.rate;
}
}
@@ -118,7 +117,7 @@ public class Compressor {
// update the compression ratio
float compressionRate = (float) buffer.position() / this.writtenUncompressed;
- typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+ TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
}
}
@@ -192,7 +191,7 @@ public class Compressor {
return bufferStream.buffer().position();
} else {
// estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
- return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+ return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
}
}
@@ -209,8 +208,8 @@ public class Compressor {
// dynamically load the snappy class to avoid runtime dependency
// on snappy if we are not using it
try {
- Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream");
- OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE)
+ Class<?> outputStreamClass = Class.forName("org.xerial.snappy.SnappyOutputStream");
+ OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
.newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
@@ -218,7 +217,7 @@ public class Compressor {
}
case LZ4:
try {
- Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream");
+ Class<?> outputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream");
OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
.newInstance(buffer);
return new DataOutputStream(stream);
@@ -244,8 +243,8 @@ public class Compressor {
// dynamically load the snappy class to avoid runtime dependency
// on snappy if we are not using it
try {
- Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream");
- InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class)
+ Class<?> inputStreamClass = Class.forName("org.xerial.snappy.SnappyInputStream");
+ InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
.newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
@@ -254,7 +253,7 @@ public class Compressor {
case LZ4:
// dynamically load LZ4 class to avoid runtime dependency
try {
- Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream");
+ Class<?> inputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream");
InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
.newInstance(buffer);
return new DataInputStream(stream);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
new file mode 100644
index 0000000..f480da2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ *
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ * Format Spec</a>
+ */
+public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+
+ public static final String PREMATURE_EOS = "Stream ended prematurely";
+ public static final String NOT_SUPPORTED = "Stream unsupported";
+ public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
+ public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
+
+ private final LZ4SafeDecompressor decompressor;
+ private final XXHash32 checksum;
+ private final byte[] buffer;
+ private final byte[] compressedBuffer;
+ private final int maxBlockSize;
+ private FLG flg;
+ private BD bd;
+ private int bufferOffset;
+ private int bufferSize;
+ private boolean finished;
+
+ /**
+ * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+ *
+ * @param in The stream to decompress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+ super(in);
+ decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ readHeader();
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[maxBlockSize];
+ bufferOffset = 0;
+ bufferSize = 0;
+ finished = false;
+ }
+
+ /**
+ * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+ *
+ * @throws IOException
+ */
+ private void readHeader() throws IOException {
+ byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
+
+ // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
+ bufferOffset = 6;
+ if (in.read(header, 0, bufferOffset) != bufferOffset) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset - 6)) {
+ throw new IOException(NOT_SUPPORTED);
+ }
+ flg = FLG.fromByte(header[bufferOffset - 2]);
+ bd = BD.fromByte(header[bufferOffset - 1]);
+ // TODO read uncompressed content size, update flg.validate()
+ // TODO read dictionary id, update flg.validate()
+
+ // check stream descriptor hash
+ byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
+ header[bufferOffset++] = (byte) in.read();
+ if (hash != header[bufferOffset - 1]) {
+ throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+ }
+ }
+
+ /**
+ * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the
+ * result to a buffer.
+ *
+ * @throws IOException
+ */
+ private void readBlock() throws IOException {
+ int blockSize = Utils.readUnsignedIntLE(in);
+
+ // Check for EndMark
+ if (blockSize == 0) {
+ finished = true;
+ // TODO implement content checksum, update flg.validate()
+ return;
+ } else if (blockSize > maxBlockSize) {
+ throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
+ }
+
+ boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+ byte[] bufferToRead;
+ if (compressed) {
+ bufferToRead = compressedBuffer;
+ } else {
+ blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ bufferToRead = buffer;
+ bufferSize = blockSize;
+ }
+
+ if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ // verify checksum
+ if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
+ throw new IOException(BLOCK_HASH_MISMATCH);
+ }
+
+ if (compressed) {
+ try {
+ bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+ } catch (LZ4Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ bufferOffset = 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+ int value = buffer[bufferOffset++] & 0xFF;
+
+ return value;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ net.jpountz.util.Utils.checkRange(b, off, len);
+ if (finished) {
+ return -1;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return -1;
+ }
+ len = Math.min(len, available());
+ System.arraycopy(buffer, bufferOffset, b, off, len);
+ bufferOffset += len;
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (finished) {
+ return 0;
+ }
+ if (available() == 0) {
+ readBlock();
+ }
+ if (finished) {
+ return 0;
+ }
+ n = Math.min(n, available());
+ bufferOffset += n;
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return bufferSize - bufferOffset;
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ throw new RuntimeException("mark not supported");
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ throw new RuntimeException("reset not supported");
+ }
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
new file mode 100644
index 0000000..6a2231f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ *
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ * Format Spec</a>
+ */
+public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+
+ public static final int MAGIC = 0x184D2204;
+ public static final int LZ4_MAX_HEADER_LENGTH = 19;
+ public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
+
+ public static final String CLOSED_STREAM = "The stream is already closed";
+
+ public static final int BLOCKSIZE_64KB = 4;
+ public static final int BLOCKSIZE_256KB = 5;
+ public static final int BLOCKSIZE_1MB = 6;
+ public static final int BLOCKSIZE_4MB = 7;
+
+ private final LZ4Compressor compressor;
+ private final XXHash32 checksum;
+ private final FLG flg;
+ private final BD bd;
+ private final byte[] buffer;
+ private final byte[] compressedBuffer;
+ private final int maxBlockSize;
+ private int bufferOffset;
+ private boolean finished;
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+ * every block of data
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+ super(out);
+ compressor = LZ4Factory.fastestInstance().fastCompressor();
+ checksum = XXHashFactory.fastestInstance().hash32();
+ bd = new BD(blockSize);
+ flg = new FLG(blockChecksum);
+ bufferOffset = 0;
+ maxBlockSize = bd.getBlockMaximumSize();
+ buffer = new byte[maxBlockSize];
+ compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
+ finished = false;
+ writeHeader();
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The stream to compress
+ * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+ * values will generate an exception
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
+ this(out, blockSize, false);
+ }
+
+ /**
+ * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+ *
+ * @param out The output stream to compress
+ * @throws IOException
+ */
+ public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
+ this(out, BLOCKSIZE_64KB);
+ }
+
+ /**
+ * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeHeader() throws IOException {
+ Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+ bufferOffset = 4;
+ buffer[bufferOffset++] = flg.toByte();
+ buffer[bufferOffset++] = bd.toByte();
+ // TODO write uncompressed content size, update flg.validate()
+ // TODO write dictionary id, update flg.validate()
+ // compute checksum on all descriptor fields
+ int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
+ buffer[bufferOffset++] = (byte) hash;
+ // write out frame descriptor
+ out.write(buffer, 0, bufferOffset);
+ bufferOffset = 0;
+ }
+
+ /**
+ * Compresses buffered data, optionally computes an XXHash32 checksum, and writes the result to the underlying
+ * {@link OutputStream}.
+ *
+ * @throws IOException
+ */
+ private void writeBlock() throws IOException {
+ if (bufferOffset == 0) {
+ return;
+ }
+
+ int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0);
+ byte[] bufferToWrite = compressedBuffer;
+ int compressMethod = 0;
+
+ // Store block uncompressed if compressed length is greater (incompressible)
+ if (compressedLength >= bufferOffset) {
+ bufferToWrite = buffer;
+ compressedLength = bufferOffset;
+ compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
+ }
+
+ // Write content
+ Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+ out.write(bufferToWrite, 0, compressedLength);
+
+ // Calculate and write block checksum
+ if (flg.isBlockChecksumSet()) {
+ int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
+ Utils.writeUnsignedIntLE(out, hash);
+ }
+ bufferOffset = 0;
+ }
+
+ /**
+ * Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block checksum) to signal the end
+ * of the block stream.
+ *
+ * @throws IOException
+ */
+ private void writeEndMark() throws IOException {
+ Utils.writeUnsignedIntLE(out, 0);
+ // TODO implement content checksum, update flg.validate()
+ finished = true;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ensureNotFinished();
+ if (bufferOffset == maxBlockSize) {
+ writeBlock();
+ }
+ buffer[bufferOffset++] = (byte) b;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ net.jpountz.util.Utils.checkRange(b, off, len);
+ ensureNotFinished();
+
+ int bufferRemainingLength = maxBlockSize - bufferOffset;
+ // while b will fill the buffer
+ while (len > bufferRemainingLength) {
+ // fill remaining space in buffer
+ System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
+ bufferOffset = maxBlockSize;
+ writeBlock();
+ // compute new offset and length
+ off += bufferRemainingLength;
+ len -= bufferRemainingLength;
+ bufferRemainingLength = maxBlockSize;
+ }
+
+ System.arraycopy(b, off, buffer, bufferOffset, len);
+ bufferOffset += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (!finished) {
+ writeBlock();
+ }
+ if (out != null) {
+ out.flush();
+ }
+ }
+
+ /**
+ * A simple state check to ensure the stream is still open.
+ */
+ private void ensureNotFinished() {
+ if (finished) {
+ throw new IllegalStateException(CLOSED_STREAM);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!finished) {
+ writeEndMark();
+ flush();
+ finished = true;
+ }
+ if (out != null) {
+ out.close();
+ out = null;
+ }
+ }
+
+ public static class FLG {
+
+ private static final int VERSION = 1;
+
+ private final int presetDictionary;
+ private final int reserved1;
+ private final int contentChecksum;
+ private final int contentSize;
+ private final int blockChecksum;
+ private final int blockIndependence;
+ private final int version;
+
+ public FLG() {
+ this(false);
+ }
+
+ public FLG(boolean blockChecksum) {
+ this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+ }
+
+ private FLG(int presetDictionary,
+ int reserved1,
+ int contentChecksum,
+ int contentSize,
+ int blockChecksum,
+ int blockIndependence,
+ int version) {
+ this.presetDictionary = presetDictionary;
+ this.reserved1 = reserved1;
+ this.contentChecksum = contentChecksum;
+ this.contentSize = contentSize;
+ this.blockChecksum = blockChecksum;
+ this.blockIndependence = blockIndependence;
+ this.version = version;
+ validate();
+ }
+
+ public static FLG fromByte(byte flg) {
+ int presetDictionary = (flg >>> 0) & 1;
+ int reserved1 = (flg >>> 1) & 1;
+ int contentChecksum = (flg >>> 2) & 1;
+ int contentSize = (flg >>> 3) & 1;
+ int blockChecksum = (flg >>> 4) & 1;
+ int blockIndependence = (flg >>> 5) & 1;
+ int version = (flg >>> 6) & 3;
+
+ return new FLG(presetDictionary,
+ reserved1,
+ contentChecksum,
+ contentSize,
+ blockChecksum,
+ blockIndependence,
+ version);
+ }
+
+ public byte toByte() {
+ return (byte) (((presetDictionary & 1) << 0) | ((reserved1 & 1) << 1) | ((contentChecksum & 1) << 2)
+ | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
+ }
+
+ private void validate() {
+ if (presetDictionary != 0) {
+ throw new RuntimeException("Preset dictionary is unsupported");
+ }
+ if (reserved1 != 0) {
+ throw new RuntimeException("Reserved1 field must be 0");
+ }
+ if (contentChecksum != 0) {
+ throw new RuntimeException("Content checksum is unsupported");
+ }
+ if (contentSize != 0) {
+ throw new RuntimeException("Content size is unsupported");
+ }
+ if (blockIndependence != 1) {
+ throw new RuntimeException("Dependent block stream is unsupported");
+ }
+ if (version != VERSION) {
+ throw new RuntimeException(String.format("Version %d is unsupported", version));
+ }
+ }
+
+ public boolean isPresetDictionarySet() {
+ return presetDictionary == 1;
+ }
+
+ public boolean isContentChecksumSet() {
+ return contentChecksum == 1;
+ }
+
+ public boolean isContentSizeSet() {
+ return contentSize == 1;
+ }
+
+ public boolean isBlockChecksumSet() {
+ return blockChecksum == 1;
+ }
+
+ public boolean isBlockIndependenceSet() {
+ return blockIndependence == 1;
+ }
+
+ public int getVersion() {
+ return version;
+ }
+ }
+
+ public static class BD {
+
+ private final int reserved2;
+ private final int blockSizeValue;
+ private final int reserved3;
+
+ public BD() {
+ this(0, BLOCKSIZE_64KB, 0);
+ }
+
+ public BD(int blockSizeValue) {
+ this(0, blockSizeValue, 0);
+ }
+
+ private BD(int reserved2, int blockSizeValue, int reserved3) {
+ this.reserved2 = reserved2;
+ this.blockSizeValue = blockSizeValue;
+ this.reserved3 = reserved3;
+ validate();
+ }
+
+ public static BD fromByte(byte bd) {
+ int reserved2 = (bd >>> 0) & 15;
+ int blockMaximumSize = (bd >>> 4) & 7;
+ int reserved3 = (bd >>> 7) & 1;
+
+ return new BD(reserved2, blockMaximumSize, reserved3);
+ }
+
+ private void validate() {
+ if (reserved2 != 0) {
+ throw new RuntimeException("Reserved2 field must be 0");
+ }
+ if (blockSizeValue < 4 || blockSizeValue > 7) {
+ throw new RuntimeException("Block size value must be between 4 and 7");
+ }
+ if (reserved3 != 0) {
+ throw new RuntimeException("Reserved3 field must be 0");
+ }
+ }
+
+ // 2^(2n+8)
+ public int getBlockMaximumSize() {
+ return 1 << ((2 * blockSizeValue) + 8);
+ }
+
+ public byte toByte() {
+ return (byte) (((reserved2 & 15) << 0) | ((blockSizeValue & 7) << 4) | ((reserved3 & 1) << 7));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index cc4084f..083e7a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -164,21 +164,21 @@ public class MemoryRecords implements Records {
@Override
public String toString() {
- Iterator<LogEntry> iter = iterator();
- StringBuilder builder = new StringBuilder();
- builder.append('[');
- while(iter.hasNext()) {
- LogEntry entry = iter.next();
- builder.append('(');
- builder.append("offset=");
- builder.append(entry.offset());
- builder.append(",");
- builder.append("record=");
- builder.append(entry.record());
- builder.append(")");
- }
- builder.append(']');
- return builder.toString();
+ Iterator<LogEntry> iter = iterator();
+ StringBuilder builder = new StringBuilder();
+ builder.append('[');
+ while (iter.hasNext()) {
+ LogEntry entry = iter.next();
+ builder.append('(');
+ builder.append("offset=");
+ builder.append(entry.offset());
+ builder.append(",");
+ builder.append("record=");
+ builder.append(entry.record());
+ builder.append(")");
+ }
+ builder.append(']');
+ return builder.toString();
}
public static class RecordsIterator extends AbstractIterator<LogEntry> {
@@ -218,8 +218,8 @@ public class MemoryRecords implements Records {
if (type == CompressionType.NONE) {
rec = buffer.slice();
int newPos = buffer.position() + size;
- if(newPos > buffer.limit())
- return allDone();
+ if (newPos > buffer.limit())
+ return allDone();
buffer.position(newPos);
rec.limit(size);
} else {
@@ -251,7 +251,7 @@ public class MemoryRecords implements Records {
}
private boolean innerDone() {
- return (innerIter == null || !innerIter.hasNext());
+ return innerIter == null || !innerIter.hasNext();
}
}
}