You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/02/04 05:58:43 UTC

[4/6] kafka git commit: KAFKA-1915: Add checkstyle for java code.

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..101f382 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -28,346 +28,347 @@ import org.apache.kafka.common.protocol.types.Schema;
 
 public class Protocol {
 
-    public static Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
-                                                     new Field("api_version", INT16, "The version of the API."),
-                                                     new Field("correlation_id",
-                                                               INT32,
-                                                               "A user-supplied integer value that will be passed back with the response"),
-                                                     new Field("client_id",
-                                                               STRING,
-                                                               "A user specified identifier for the client making the request."));
-
-    public static Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
-                                                                INT32,
-                                                                "The user-supplied value passed in with the request"));
+    public static final Schema REQUEST_HEADER = new Schema(new Field("api_key", INT16, "The id of the request type."),
+                                                           new Field("api_version", INT16, "The version of the API."),
+                                                           new Field("correlation_id",
+                                                                     INT32,
+                                                                     "A user-supplied integer value that will be passed back with the response"),
+                                                           new Field("client_id",
+                                                                     STRING,
+                                                                     "A user specified identifier for the client making the request."));
+
+    public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
+                                                                      INT32,
+                                                                      "The user-supplied value passed in with the request"));
 
     /* Metadata api */
 
-    public static Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
-                                                                    new ArrayOf(STRING),
-                                                                    "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
+    public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics",
+                                                                          new ArrayOf(STRING),
+                                                                          "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics."));
 
-    public static Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
-                                             new Field("host", STRING, "The hostname of the broker."),
-                                             new Field("port", INT32, "The port on which the broker accepts requests."));
+    public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."),
+                                                   new Field("host", STRING, "The hostname of the broker."),
+                                                   new Field("port",
+                                                             INT32,
+                                                             "The port on which the broker accepts requests."));
 
-    public static Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
-                                                                      INT16,
-                                                                      "The error code for the partition, if any."),
-                                                            new Field("partition_id", INT32, "The id of the partition."),
-                                                            new Field("leader",
-                                                                      INT32,
-                                                                      "The id of the broker acting as leader for this partition."),
-                                                            new Field("replicas",
-                                                                      new ArrayOf(INT32),
-                                                                      "The set of all nodes that host this partition."),
-                                                            new Field("isr",
-                                                                      new ArrayOf(INT32),
-                                                                      "The set of nodes that are in sync with the leader for this partition."));
-
-    public static Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code", INT16, "The error code for the given topic."),
-                                                        new Field("topic", STRING, "The name of the topic"),
-                                                        new Field("partition_metadata",
-                                                                  new ArrayOf(PARTITION_METADATA_V0),
-                                                                  "Metadata for each partition of the topic."));
-
-    public static Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
-                                                                     new ArrayOf(BROKER),
-                                                                     "Host and port information for all brokers."),
-                                                           new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V0)));
-
-    public static Schema[] METADATA_REQUEST = new Schema[] { METADATA_REQUEST_V0 };
-    public static Schema[] METADATA_RESPONSE = new Schema[] { METADATA_RESPONSE_V0 };
+    public static final Schema PARTITION_METADATA_V0 = new Schema(new Field("partition_error_code",
+                                                                            INT16,
+                                                                            "The error code for the partition, if any."),
+                                                                  new Field("partition_id",
+                                                                            INT32,
+                                                                            "The id of the partition."),
+                                                                  new Field("leader",
+                                                                            INT32,
+                                                                            "The id of the broker acting as leader for this partition."),
+                                                                  new Field("replicas",
+                                                                            new ArrayOf(INT32),
+                                                                            "The set of all nodes that host this partition."),
+                                                                  new Field("isr",
+                                                                            new ArrayOf(INT32),
+                                                                            "The set of nodes that are in sync with the leader for this partition."));
+
+    public static final Schema TOPIC_METADATA_V0 = new Schema(new Field("topic_error_code",
+                                                                        INT16,
+                                                                        "The error code for the given topic."),
+                                                              new Field("topic", STRING, "The name of the topic"),
+                                                              new Field("partition_metadata",
+                                                                        new ArrayOf(PARTITION_METADATA_V0),
+                                                                        "Metadata for each partition of the topic."));
+
+    public static final Schema METADATA_RESPONSE_V0 = new Schema(new Field("brokers",
+                                                                           new ArrayOf(BROKER),
+                                                                           "Host and port information for all brokers."),
+                                                                 new Field("topic_metadata",
+                                                                           new ArrayOf(TOPIC_METADATA_V0)));
+
+    public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0};
+    public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0};
 
     /* Produce api */
 
-    public static Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
-                                                            new Field("data", new ArrayOf(new Schema(new Field("partition", INT32),
-                                                                                                     new Field("record_set", BYTES)))));
-
-    public static Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
-                                                                   INT16,
-                                                                   "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
-                                                         new Field("timeout", INT32, "The time to await a response in ms."),
-                                                         new Field("topic_data", new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
-
-    public static Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                    new ArrayOf(new Schema(new Field("topic", STRING),
-                                                                                           new Field("partition_responses",
-                                                                                                     new ArrayOf(new Schema(new Field("partition",
-                                                                                                                                      INT32),
-                                                                                                                            new Field("error_code",
-                                                                                                                                      INT16),
-                                                                                                                            new Field("base_offset",
-                                                                                                                                      INT64))))))));
-
-    public static Schema[] PRODUCE_REQUEST = new Schema[] { PRODUCE_REQUEST_V0 };
-    public static Schema[] PRODUCE_RESPONSE = new Schema[] { PRODUCE_RESPONSE_V0 };
-
-    /* Offset commit api */
-    public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                   INT32,
-                                                                                   "Topic partition id."),
-                                                                         new Field("offset",
-                                                                                   INT64,
-                                                                                   "Message offset to be committed."),
-                                                                         new Field("timestamp",
-                                                                                   INT64,
-                                                                                   "Timestamp of the commit"),
-                                                                         new Field("metadata",
-                                                                                   STRING,
-                                                                                   "Any associated metadata the client wants to keep."));
-
-    public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                                STRING,
-                                                                                "Topic to commit."),
-                                                                       new Field("partitions",
-                                                                                 new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
-                                                                                 "Partitions to commit offsets."));
-
-    public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                         STRING,
-                                                                         "The consumer group id."),
-                                                               new Field("topics",
-                                                                         new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
-                                                                         "Topics to commit offsets."));
-
-    public static Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
-                                                                         STRING,
-                                                                         "The consumer group id."),
-                                                               new Field("group_generation_id",
+    public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING),
+                                                                  new Field("data",
+                                                                            new ArrayOf(new Schema(new Field("partition",
+                                                                                                             INT32),
+                                                                                                   new Field("record_set",
+                                                                                                             BYTES)))));
+
+    public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks",
+                                                                         INT16,
+                                                                         "The number of nodes that should replicate the produce before returning. -1 indicates the full ISR."),
+                                                               new Field("timeout",
                                                                          INT32,
-                                                                         "The generation of the consumer group."),
-                                                               new Field("consumer_id",
-                                                                         STRING,
-                                                                         "The consumer id assigned by the group coordinator."),
-                                                               new Field("topics",
-                                                                         new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
-                                                                         "Topics to commit offsets."));
-
-    public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                    INT32,
-                                                                                    "Topic partition id."),
-                                                                          new Field("error_code",
-                                                                                    INT16));
-
-    public static Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                      new Field("partition_responses",
-                                                                                new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
-
-    public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                          new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
-
-    public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
-    /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
-    public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+                                                                         "The time to await a response in ms."),
+                                                               new Field("topic_data",
+                                                                         new ArrayOf(TOPIC_PRODUCE_DATA_V0)));
+
+    public static final Schema PRODUCE_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                          new ArrayOf(new Schema(new Field("topic",
+                                                                                                           STRING),
+                                                                                                 new Field("partition_responses",
+                                                                                                           new ArrayOf(new Schema(new Field("partition",
+                                                                                                                                            INT32),
+                                                                                                                                  new Field("error_code",
+                                                                                                                                            INT16),
+                                                                                                                                  new Field("base_offset",
+                                                                                                                                            INT64))))))));
+
+    public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0};
+    public static final Schema[] PRODUCE_RESPONSE = new Schema[] {PRODUCE_RESPONSE_V0};
 
-    /* Offset fetch api */
-    public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                  INT32,
-                                                                                  "Topic partition id."));
+    /* Offset commit api */
+    public static final Schema OFFSET_COMMIT_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                         INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("offset",
+                                                                                         INT64,
+                                                                                         "Message offset to be committed."),
+                                                                               new Field("timestamp",
+                                                                                         INT64,
+                                                                                         "Timestamp of the commit"),
+                                                                               new Field("metadata",
+                                                                                         STRING,
+                                                                                         "Any associated metadata the client wants to keep."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                                     STRING,
+                                                                                     "Topic to commit."),
+                                                                           new Field("partitions",
+                                                                                     new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
+                                                                                     "Partitions to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                               STRING,
+                                                                               "The consumer group id."),
+                                                                     new Field("topics",
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+                                                                               "Topics to commit offsets."));
 
-    public static Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+    public static final Schema OFFSET_COMMIT_REQUEST_V1 = new Schema(new Field("group_id",
                                                                                STRING,
-                                                                               "Topic to fetch offset."),
-                                                                     new Field("partitions",
-                                                                                new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
-                                                                                "Partitions to fetch offsets."));
-
-    public static Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                        STRING,
-                                                                        "The consumer group id."),
-                                                              new Field("topics",
-                                                                        new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
-                                                                        "Topics to fetch offsets."));
-
-    public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                                   INT32,
-                                                                                   "Topic partition id."),
-                                                                         new Field("offset",
-                                                                                   INT64,
-                                                                                   "Last committed message offset."),
-                                                                         new Field("metadata",
-                                                                                   STRING,
-                                                                                   "Any associated metadata the client wants to keep."),
-                                                                         new Field("error_code",
-                                                                                   INT16));
+                                                                               "The consumer group id."),
+                                                                     new Field("group_generation_id",
+                                                                               INT32,
+                                                                               "The generation of the consumer group."),
+                                                                     new Field("consumer_id",
+                                                                               STRING,
+                                                                               "The consumer id assigned by the group coordinator."),
+                                                                     new Field("topics",
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+                                                                               "Topics to commit offsets."));
+
+    public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                          INT32,
+                                                                                          "Topic partition id."),
+                                                                                new Field("error_code", INT16));
 
-    public static Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                     new Field("partition_responses",
-                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+    public static final Schema OFFSET_COMMIT_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                            new Field("partition_responses",
+                                                                                      new ArrayOf(OFFSET_COMMIT_RESPONSE_PARTITION_V0)));
 
-    public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                         new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+    public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                                new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
-    public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
-    public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
+    public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1};
+    /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
+    public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+
+    /* Offset fetch api */
+    public static final Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                        INT32,
+                                                                                        "Topic partition id."));
+
+    public static final Schema OFFSET_FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                                    STRING,
+                                                                                    "Topic to fetch offset."),
+                                                                          new Field("partitions",
+                                                                                    new ArrayOf(OFFSET_FETCH_REQUEST_PARTITION_V0),
+                                                                                    "Partitions to fetch offsets."));
+
+    public static final Schema OFFSET_FETCH_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                              STRING,
+                                                                              "The consumer group id."),
+                                                                    new Field("topics",
+                                                                              new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
+                                                                              "Topics to fetch offsets."));
+
+    public static final Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                         INT32,
+                                                                                         "Topic partition id."),
+                                                                               new Field("offset",
+                                                                                         INT64,
+                                                                                         "Last committed message offset."),
+                                                                               new Field("metadata",
+                                                                                         STRING,
+                                                                                         "Any associated metadata the client wants to keep."),
+                                                                               new Field("error_code", INT16));
+
+    public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                           new Field("partition_responses",
+                                                                                     new ArrayOf(OFFSET_FETCH_RESPONSE_PARTITION_V0)));
+
+    public static final Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                               new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0};
+    public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0};
 
     /* List offset api */
-    public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+    public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                       INT32,
+                                                                                       "Topic partition id."),
+                                                                             new Field("timestamp", INT64, "Timestamp."),
+                                                                             new Field("max_num_offsets",
+                                                                                       INT32,
+                                                                                       "Maximum offsets to return."));
+
+    public static final Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
+                                                                                   STRING,
+                                                                                   "Topic to list offset."),
+                                                                         new Field("partitions",
+                                                                                   new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
+                                                                                   "Partitions to list offset."));
+
+    public static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+                                                                             INT32,
+                                                                             "Broker id of the follower. For normal consumers, use -1."),
+                                                                   new Field("topics",
+                                                                             new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
+                                                                             "Topics to list offsets."));
+
+    public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+                                                                                        INT32,
+                                                                                        "Topic partition id."),
+                                                                              new Field("error_code", INT16),
+                                                                              new Field("offsets",
+                                                                                        new ArrayOf(INT64),
+                                                                                        "A list of offsets."));
+
+    public static final Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                          new Field("partition_responses",
+                                                                                    new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
+
+    public static final Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                              new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
+
+    public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0};
+    public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0};
+
+    /* Fetch api */
+    public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
                                                                                  INT32,
                                                                                  "Topic partition id."),
-                                                                       new Field("timestamp",
+                                                                       new Field("fetch_offset",
                                                                                  INT64,
-                                                                                 "Timestamp."),
-                                                                       new Field("max_num_offsets",
+                                                                                 "Message offset."),
+                                                                       new Field("max_bytes",
                                                                                  INT32,
-                                                                                 "Maximum offsets to return."));
+                                                                                 "Maximum bytes to fetch."));
 
-    public static Schema LIST_OFFSET_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                             STRING,
-                                                                             "Topic to list offset."),
+    public static final Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic to fetch."),
                                                                    new Field("partitions",
-                                                                             new ArrayOf(LIST_OFFSET_REQUEST_PARTITION_V0),
-                                                                             "Partitions to list offset."));
+                                                                             new ArrayOf(FETCH_REQUEST_PARTITION_V0),
+                                                                             "Partitions to fetch."));
 
-    public static Schema LIST_OFFSET_REQUEST_V0 = new Schema(new Field("replica_id",
+    public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
                                                                        INT32,
                                                                        "Broker id of the follower. For normal consumers, use -1."),
+                                                             new Field("max_wait_time",
+                                                                       INT32,
+                                                                       "Maximum time in ms to wait for the response."),
+                                                             new Field("min_bytes",
+                                                                       INT32,
+                                                                       "Minimum bytes to accumulate in the response."),
                                                              new Field("topics",
-                                                                        new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V0),
-                                                                        "Topics to list offsets."));
+                                                                       new ArrayOf(FETCH_REQUEST_TOPIC_V0),
+                                                                       "Topics to fetch."));
 
-    public static Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
+    public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                   INT32,
                                                                                   "Topic partition id."),
-                                                                        new Field("error_code",
-                                                                                  INT16),
-                                                                        new Field("offsets",
-                                                                                  new ArrayOf(INT64),
-                                                                                  "A list of offsets."));
+                                                                        new Field("error_code", INT16),
+                                                                        new Field("high_watermark",
+                                                                                  INT64,
+                                                                                  "Last committed offset."),
+                                                                        new Field("record_set", BYTES));
 
-    public static Schema LIST_OFFSET_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+    public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
                                                                     new Field("partition_responses",
-                                                                              new ArrayOf(LIST_OFFSET_RESPONSE_PARTITION_V0)));
-
-    public static Schema LIST_OFFSET_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                  new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V0)));
-
-    public static Schema[] LIST_OFFSET_REQUEST = new Schema[] { LIST_OFFSET_REQUEST_V0 };
-    public static Schema[] LIST_OFFSET_RESPONSE = new Schema[] { LIST_OFFSET_RESPONSE_V0 };
-
-    /* Fetch api */
-    public static Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
-                                                                           INT32,
-                                                                           "Topic partition id."),
-                                                                 new Field("fetch_offset",
-                                                                           INT64,
-                                                                           "Message offset."),
-                                                                 new Field("max_bytes",
-                                                                           INT32,
-                                                                           "Maximum bytes to fetch."));
-
-    public static Schema FETCH_REQUEST_TOPIC_V0 = new Schema(new Field("topic",
-                                                                       STRING,
-                                                                       "Topic to fetch."),
-                                                             new Field("partitions",
-                                                                       new ArrayOf(FETCH_REQUEST_PARTITION_V0),
-                                                                       "Partitions to fetch."));
-
-    public static Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
-                                                                 INT32,
-                                                                 "Broker id of the follower. For normal consumers, use -1."),
-                                                       new Field("max_wait_time",
-                                                                 INT32,
-                                                                 "Maximum time in ms to wait for the response."),
-                                                       new Field("min_bytes",
-                                                                 INT32,
-                                                                 "Minimum bytes to accumulate in the response."),
-                                                       new Field("topics",
-                                                                 new ArrayOf(FETCH_REQUEST_TOPIC_V0),
-                                                                 "Topics to fetch."));
-
-    public static Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
-                                                                            INT32,
-                                                                            "Topic partition id."),
-                                                                  new Field("error_code",
-                                                                            INT16),
-                                                                  new Field("high_watermark",
-                                                                            INT64,
-                                                                            "Last committed offset."),
-                                                                  new Field("record_set", BYTES));
-
-    public static Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                              new Field("partition_responses",
-                                                                        new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
+                                                                              new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
 
-    public static Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
-                                                                  new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
+    public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+                                                                        new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
 
-    public static Schema[] FETCH_REQUEST = new Schema[] { FETCH_REQUEST_V0 };
-    public static Schema[] FETCH_RESPONSE = new Schema[] { FETCH_RESPONSE_V0 };
+    public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0};
+    public static final Schema[] FETCH_RESPONSE = new Schema[] {FETCH_RESPONSE_V0};
 
     /* Consumer metadata api */
-    public static Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                             STRING,
-                                                                             "The consumer group id."));
+    public static final Schema CONSUMER_METADATA_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                                   STRING,
+                                                                                   "The consumer group id."));
 
-    public static Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code",
-                                                                              INT16),
-                                                                    new Field("coordinator",
-                                                                              BROKER,
-                                                                              "Host and port information for the coordinator for a consumer group."));
+    public static final Schema CONSUMER_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                          new Field("coordinator",
+                                                                                    BROKER,
+                                                                                    "Host and port information for the coordinator for a consumer group."));
 
-    public static Schema[] CONSUMER_METADATA_REQUEST = new Schema[] { CONSUMER_METADATA_REQUEST_V0 };
-    public static Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] { CONSUMER_METADATA_RESPONSE_V0 };
+    public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0};
+    public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0};
 
     /* Join group api */
-    public static Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                      STRING,
-                                                                      "The consumer group id."),
-                                                            new Field("session_timeout",
-                                                                      INT32,
-                                                                      "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
-                                                            new Field("topics",
-                                                                      new ArrayOf(STRING),
-                                                                      "An array of topics to subscribe to."),
-                                                            new Field("consumer_id",
-                                                                      STRING,
-                                                                      "The assigned consumer id or an empty string for a new consumer."),
-                                                            new Field("partition_assignment_strategy",
-                                                                      STRING,
-                                                                      "The strategy for the coordinator to assign partitions."));
-
-    public static Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
-                                                                   new Field("partitions", new ArrayOf(INT32)));
-    public static Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code",
-                                                                       INT16),
-                                                             new Field("group_generation_id",
-                                                                       INT32,
-                                                                       "The generation of the consumer group."),
-                                                             new Field("consumer_id",
-                                                                       STRING,
-                                                                       "The consumer id assigned by the group coordinator."),
-                                                             new Field("assigned_partitions",
-                                                                       new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
+    public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id",
+                                                                            STRING,
+                                                                            "The consumer group id."),
+                                                                  new Field("session_timeout",
+                                                                            INT32,
+                                                                            "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms."),
+                                                                  new Field("topics",
+                                                                            new ArrayOf(STRING),
+                                                                            "An array of topics to subscribe to."),
+                                                                  new Field("consumer_id",
+                                                                            STRING,
+                                                                            "The assigned consumer id or an empty string for a new consumer."),
+                                                                  new Field("partition_assignment_strategy",
+                                                                            STRING,
+                                                                            "The strategy for the coordinator to assign partitions."));
+
+    public static final Schema JOIN_GROUP_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
+                                                                         new Field("partitions", new ArrayOf(INT32)));
+    public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
+                                                                   new Field("group_generation_id",
+                                                                             INT32,
+                                                                             "The generation of the consumer group."),
+                                                                   new Field("consumer_id",
+                                                                             STRING,
+                                                                             "The consumer id assigned by the group coordinator."),
+                                                                   new Field("assigned_partitions",
+                                                                             new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
 
-    public static Schema[] JOIN_GROUP_REQUEST = new Schema[] { JOIN_GROUP_REQUEST_V0 };
-    public static Schema[] JOIN_GROUP_RESPONSE = new Schema[] { JOIN_GROUP_RESPONSE_V0 };
+    public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0};
+    public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0};
 
     /* Heartbeat api */
-    public static Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                      STRING,
-                                                                      "The consumer group id."),
-                                                            new Field("group_generation_id",
-                                                                      INT32,
-                                                                      "The generation of the consumer group."),
-                                                            new Field("consumer_id",
-                                                                      STRING,
-                                                                      "The consumer id assigned by the group coordinator."));
+    public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The consumer group id."),
+                                                                 new Field("group_generation_id",
+                                                                           INT32,
+                                                                           "The generation of the consumer group."),
+                                                                 new Field("consumer_id",
+                                                                           STRING,
+                                                                           "The consumer id assigned by the group coordinator."));
 
-    public static Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code",
-                                                                       INT16));
+    public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16));
 
-    public static Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
-    public static Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
+    public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0};
+    public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0};
 
     /* an array of all requests and responses with all schema versions */
-    public static Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
-    public static Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
+    public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
+    public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
 
     /* the latest version of each api */
-    public static short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
+    public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
 
     static {
         REQUESTS[ApiKeys.PRODUCE.id] = PRODUCE_REQUEST;
@@ -401,11 +402,8 @@ public class Protocol {
         /* sanity check that we have the same number of request and response versions for each api */
         for (ApiKeys api : ApiKeys.values())
             if (REQUESTS[api.id].length != RESPONSES[api.id].length)
-                throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api "
-                                                + api.name
-                                                + " but "
-                                                + RESPONSES[api.id].length
-                                                + " response versions.");
+                throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name
+                        + " but " + RESPONSES[api.id].length + " response versions.");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index ee1f78f..ff89f0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -245,7 +245,7 @@ public class Struct {
     public ByteBuffer[] toBytes() {
         ByteBuffer buffer = ByteBuffer.allocate(sizeOf());
         writeTo(buffer);
-        return new ByteBuffer[] { buffer };
+        return new ByteBuffer[] {buffer};
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
index c7bd2f8..1c9fbaa 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ByteBufferOutputStream.java
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer;
  */
 public class ByteBufferOutputStream extends OutputStream {
 
-    private static float REALLOCATION_FACTOR = 1.1f;
+    private static final float REALLOCATION_FACTOR = 1.1f;
 
     private ByteBuffer buffer;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index d684e68..e570b29 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -34,16 +34,15 @@ public class Compressor {
     static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
     static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
-    private static float[] typeToRate;
-    private static int MAX_TYPE_ID = -1;
+    private static final float[] TYPE_TO_RATE;
 
     static {
+        int maxTypeId = -1;
+        for (CompressionType type : CompressionType.values())
+            maxTypeId = Math.max(maxTypeId, type.id);
+        TYPE_TO_RATE = new float[maxTypeId + 1];
         for (CompressionType type : CompressionType.values()) {
-            MAX_TYPE_ID = Math.max(MAX_TYPE_ID, type.id);
-        }
-        typeToRate = new float[MAX_TYPE_ID+1];
-        for (CompressionType type : CompressionType.values()) {
-            typeToRate[type.id] = type.rate;
+            TYPE_TO_RATE[type.id] = type.rate;
         }
     }
 
@@ -118,7 +117,7 @@ public class Compressor {
 
             // update the compression ratio
             float compressionRate = (float) buffer.position() / this.writtenUncompressed;
-            typeToRate[type.id] = typeToRate[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
+            TYPE_TO_RATE[type.id] = TYPE_TO_RATE[type.id] * COMPRESSION_RATE_DAMPING_FACTOR +
                 compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR);
         }
     }
@@ -192,7 +191,7 @@ public class Compressor {
             return bufferStream.buffer().position();
         } else {
             // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes
-            return (long) (writtenUncompressed * typeToRate[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
+            return (long) (writtenUncompressed * TYPE_TO_RATE[type.id] * COMPRESSION_RATE_ESTIMATION_FACTOR);
         }
     }
 
@@ -209,8 +208,8 @@ public class Compressor {
                     // dynamically load the snappy class to avoid runtime dependency
                     // on snappy if we are not using it
                     try {
-                        Class SnappyOutputStream = Class.forName("org.xerial.snappy.SnappyOutputStream");
-                        OutputStream stream = (OutputStream) SnappyOutputStream.getConstructor(OutputStream.class, Integer.TYPE)
+                        Class<?> outputStreamClass = Class.forName("org.xerial.snappy.SnappyOutputStream");
+                        OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
                             .newInstance(buffer, bufferSize);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
@@ -218,7 +217,7 @@ public class Compressor {
                     }
                 case LZ4:
                     try {
-                        Class outputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockOutputStream");
+                        Class<?> outputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream");
                         OutputStream stream = (OutputStream) outputStreamClass.getConstructor(OutputStream.class)
                             .newInstance(buffer);
                         return new DataOutputStream(stream);
@@ -244,8 +243,8 @@ public class Compressor {
                     // dynamically load the snappy class to avoid runtime dependency
                     // on snappy if we are not using it
                     try {
-                        Class SnappyInputStream = Class.forName("org.xerial.snappy.SnappyInputStream");
-                        InputStream stream = (InputStream) SnappyInputStream.getConstructor(InputStream.class)
+                        Class<?> inputStreamClass = Class.forName("org.xerial.snappy.SnappyInputStream");
+                        InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
                             .newInstance(buffer);
                         return new DataInputStream(stream);
                     } catch (Exception e) {
@@ -254,7 +253,7 @@ public class Compressor {
                 case LZ4:
                     // dynamically load LZ4 class to avoid runtime dependency
                     try {
-                        Class inputStreamClass = Class.forName("org.apache.kafka.common.message.KafkaLZ4BlockInputStream");
+                        Class<?> inputStreamClass = Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream");
                         InputStream stream = (InputStream) inputStreamClass.getConstructor(InputStream.class)
                             .newInstance(buffer);
                         return new DataInputStream(stream);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
new file mode 100644
index 0000000..f480da2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Exception;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4SafeDecompressor;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * 
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ *      Format Spec</a>
+ */
+public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+
+    public static final String PREMATURE_EOS = "Stream ended prematurely";
+    public static final String NOT_SUPPORTED = "Stream unsupported";
+    public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
+    public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
+
+    private final LZ4SafeDecompressor decompressor;
+    private final XXHash32 checksum;
+    private final byte[] buffer;
+    private final byte[] compressedBuffer;
+    private final int maxBlockSize;
+    private FLG flg;
+    private BD bd;
+    private int bufferOffset;
+    private int bufferSize;
+    private boolean finished;
+
+    /**
+     * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
+     * 
+     * @param in The stream to decompress
+     * @throws IOException
+     */
+    public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
+        super(in);
+        decompressor = LZ4Factory.fastestInstance().safeDecompressor();
+        checksum = XXHashFactory.fastestInstance().hash32();
+        readHeader();
+        maxBlockSize = bd.getBlockMaximumSize();
+        buffer = new byte[maxBlockSize];
+        compressedBuffer = new byte[maxBlockSize];
+        bufferOffset = 0;
+        bufferSize = 0;
+        finished = false;
+    }
+
+    /**
+     * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+     * 
+     * @throws IOException
+     */
+    private void readHeader() throws IOException {
+        byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
+
+        // read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
+        bufferOffset = 6;
+        if (in.read(header, 0, bufferOffset) != bufferOffset) {
+            throw new IOException(PREMATURE_EOS);
+        }
+
+        if (MAGIC != Utils.readUnsignedIntLE(header, bufferOffset - 6)) {
+            throw new IOException(NOT_SUPPORTED);
+        }
+        flg = FLG.fromByte(header[bufferOffset - 2]);
+        bd = BD.fromByte(header[bufferOffset - 1]);
+        // TODO read uncompressed content size, update flg.validate()
+        // TODO read dictionary id, update flg.validate()
+
+        // check stream descriptor hash
+        byte hash = (byte) ((checksum.hash(header, 0, bufferOffset, 0) >> 8) & 0xFF);
+        header[bufferOffset++] = (byte) in.read();
+        if (hash != header[bufferOffset - 1]) {
+            throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+        }
+    }
+
+    /**
+     * Decompresses (if necessary) buffered data, optionally computes and validates a XXHash32 checksum, and writes the
+     * result to a buffer.
+     * 
+     * @throws IOException
+     */
+    private void readBlock() throws IOException {
+        int blockSize = Utils.readUnsignedIntLE(in);
+
+        // Check for EndMark
+        if (blockSize == 0) {
+            finished = true;
+            // TODO implement content checksum, update flg.validate()
+            return;
+        } else if (blockSize > maxBlockSize) {
+            throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
+        }
+
+        boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+        byte[] bufferToRead;
+        if (compressed) {
+            bufferToRead = compressedBuffer;
+        } else {
+            blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
+            bufferToRead = buffer;
+            bufferSize = blockSize;
+        }
+
+        if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+            throw new IOException(PREMATURE_EOS);
+        }
+
+        // verify checksum
+        if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
+            throw new IOException(BLOCK_HASH_MISMATCH);
+        }
+
+        if (compressed) {
+            try {
+                bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+            } catch (LZ4Exception e) {
+                throw new IOException(e);
+            }
+        }
+
+        bufferOffset = 0;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (finished) {
+            return -1;
+        }
+        if (available() == 0) {
+            readBlock();
+        }
+        if (finished) {
+            return -1;
+        }
+        int value = buffer[bufferOffset++] & 0xFF;
+
+        return value;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        net.jpountz.util.Utils.checkRange(b, off, len);
+        if (finished) {
+            return -1;
+        }
+        if (available() == 0) {
+            readBlock();
+        }
+        if (finished) {
+            return -1;
+        }
+        len = Math.min(len, available());
+        System.arraycopy(buffer, bufferOffset, b, off, len);
+        bufferOffset += len;
+        return len;
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+        if (finished) {
+            return 0;
+        }
+        if (available() == 0) {
+            readBlock();
+        }
+        if (finished) {
+            return 0;
+        }
+        n = Math.min(n, available());
+        bufferOffset += n;
+        return n;
+    }
+
+    @Override
+    public int available() throws IOException {
+        return bufferSize - bufferOffset;
+    }
+
+    @Override
+    public void close() throws IOException {
+        in.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+        throw new RuntimeException("mark not supported");
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+        throw new RuntimeException("reset not supported");
+    }
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
new file mode 100644
index 0000000..6a2231f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockOutputStream.java
@@ -0,0 +1,392 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.record;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.kafka.common.utils.Utils;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.xxhash.XXHash32;
+import net.jpountz.xxhash.XXHashFactory;
+
+/**
+ * A partial implementation of the v1.4.1 LZ4 Frame format.
+ * 
+ * @see <a href="https://docs.google.com/document/d/1Tdxmn5_2e5p1y4PtXkatLndWVb0R8QARJFe6JI4Keuo/edit">LZ4 Framing
+ *      Format Spec</a>
+ */
+public final class KafkaLZ4BlockOutputStream extends FilterOutputStream {
+
+    public static final int MAGIC = 0x184D2204;
+    public static final int LZ4_MAX_HEADER_LENGTH = 19;
+    public static final int LZ4_FRAME_INCOMPRESSIBLE_MASK = 0x80000000;
+
+    public static final String CLOSED_STREAM = "The stream is already closed";
+
+    public static final int BLOCKSIZE_64KB = 4;
+    public static final int BLOCKSIZE_256KB = 5;
+    public static final int BLOCKSIZE_1MB = 6;
+    public static final int BLOCKSIZE_4MB = 7;
+
+    private final LZ4Compressor compressor;
+    private final XXHash32 checksum;
+    private final FLG flg;
+    private final BD bd;
+    private final byte[] buffer;
+    private final byte[] compressedBuffer;
+    private final int maxBlockSize;
+    private int bufferOffset;
+    private boolean finished;
+
+    /**
+     * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+     * 
+     * @param out The output stream to compress
+     * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+     *            values will generate an exception
+     * @param blockChecksum Default: false. When true, a XXHash32 checksum is computed and appended to the stream for
+     *            every block of data
+     * @throws IOException
+     */
+    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize, boolean blockChecksum) throws IOException {
+        super(out);
+        compressor = LZ4Factory.fastestInstance().fastCompressor();
+        checksum = XXHashFactory.fastestInstance().hash32();
+        bd = new BD(blockSize);
+        flg = new FLG(blockChecksum);
+        bufferOffset = 0;
+        maxBlockSize = bd.getBlockMaximumSize();
+        buffer = new byte[maxBlockSize];
+        compressedBuffer = new byte[compressor.maxCompressedLength(maxBlockSize)];
+        finished = false;
+        writeHeader();
+    }
+
+    /**
+     * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+     * 
+     * @param out The stream to compress
+     * @param blockSize Default: 4. The block size used during compression. 4=64kb, 5=256kb, 6=1mb, 7=4mb. All other
+     *            values will generate an exception
+     * @throws IOException
+     */
+    public KafkaLZ4BlockOutputStream(OutputStream out, int blockSize) throws IOException {
+        this(out, blockSize, false);
+    }
+
+    /**
+     * Create a new {@link OutputStream} that will compress data using the LZ4 algorithm.
+     * 
+     * @param out The output stream to compress
+     * @throws IOException
+     */
+    public KafkaLZ4BlockOutputStream(OutputStream out) throws IOException {
+        this(out, BLOCKSIZE_64KB);
+    }
+
+    /**
+     * Writes the magic number and frame descriptor to the underlying {@link OutputStream}.
+     * 
+     * @throws IOException
+     */
+    private void writeHeader() throws IOException {
+        Utils.writeUnsignedIntLE(buffer, 0, MAGIC);
+        bufferOffset = 4;
+        buffer[bufferOffset++] = flg.toByte();
+        buffer[bufferOffset++] = bd.toByte();
+        // TODO write uncompressed content size, update flg.validate()
+        // TODO write dictionary id, update flg.validate()
+        // compute checksum on all descriptor fields
+        int hash = (checksum.hash(buffer, 0, bufferOffset, 0) >> 8) & 0xFF;
+        buffer[bufferOffset++] = (byte) hash;
+        // write out frame descriptor
+        out.write(buffer, 0, bufferOffset);
+        bufferOffset = 0;
+    }
+
+    /**
+     * Compresses buffered data, optionally computes an XXHash32 checksum, and writes the result to the underlying
+     * {@link OutputStream}.
+     * 
+     * @throws IOException
+     */
+    private void writeBlock() throws IOException {
+        if (bufferOffset == 0) {
+            return;
+        }
+
+        int compressedLength = compressor.compress(buffer, 0, bufferOffset, compressedBuffer, 0);
+        byte[] bufferToWrite = compressedBuffer;
+        int compressMethod = 0;
+
+        // Store block uncompressed if compressed length is greater (incompressible)
+        if (compressedLength >= bufferOffset) {
+            bufferToWrite = buffer;
+            compressedLength = bufferOffset;
+            compressMethod = LZ4_FRAME_INCOMPRESSIBLE_MASK;
+        }
+
+        // Write content
+        Utils.writeUnsignedIntLE(out, compressedLength | compressMethod);
+        out.write(bufferToWrite, 0, compressedLength);
+
+        // Calculate and write block checksum
+        if (flg.isBlockChecksumSet()) {
+            int hash = checksum.hash(bufferToWrite, 0, compressedLength, 0);
+            Utils.writeUnsignedIntLE(out, hash);
+        }
+        bufferOffset = 0;
+    }
+
+    /**
+     * Similar to the {@link #writeBlock()} method. Writes a 0-length block (without block checksum) to signal the end
+     * of the block stream.
+     * 
+     * @throws IOException
+     */
+    private void writeEndMark() throws IOException {
+        Utils.writeUnsignedIntLE(out, 0);
+        // TODO implement content checksum, update flg.validate()
+        finished = true;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        ensureNotFinished();
+        if (bufferOffset == maxBlockSize) {
+            writeBlock();
+        }
+        buffer[bufferOffset++] = (byte) b;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        net.jpountz.util.Utils.checkRange(b, off, len);
+        ensureNotFinished();
+
+        int bufferRemainingLength = maxBlockSize - bufferOffset;
+        // while b will fill the buffer
+        while (len > bufferRemainingLength) {
+            // fill remaining space in buffer
+            System.arraycopy(b, off, buffer, bufferOffset, bufferRemainingLength);
+            bufferOffset = maxBlockSize;
+            writeBlock();
+            // compute new offset and length
+            off += bufferRemainingLength;
+            len -= bufferRemainingLength;
+            bufferRemainingLength = maxBlockSize;
+        }
+
+        System.arraycopy(b, off, buffer, bufferOffset, len);
+        bufferOffset += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (!finished) {
+            writeBlock();
+        }
+        if (out != null) {
+            out.flush();
+        }
+    }
+
+    /**
+     * A simple state check to ensure the stream is still open.
+     */
+    private void ensureNotFinished() {
+        if (finished) {
+            throw new IllegalStateException(CLOSED_STREAM);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (!finished) {
+            writeEndMark();
+            flush();
+            finished = true;
+        }
+        if (out != null) {
+            out.close();
+            out = null;
+        }
+    }
+
+    public static class FLG {
+
+        private static final int VERSION = 1;
+
+        private final int presetDictionary;
+        private final int reserved1;
+        private final int contentChecksum;
+        private final int contentSize;
+        private final int blockChecksum;
+        private final int blockIndependence;
+        private final int version;
+
+        public FLG() {
+            this(false);
+        }
+
+        public FLG(boolean blockChecksum) {
+            this(0, 0, 0, 0, blockChecksum ? 1 : 0, 1, VERSION);
+        }
+
+        private FLG(int presetDictionary,
+                    int reserved1,
+                    int contentChecksum,
+                    int contentSize,
+                    int blockChecksum,
+                    int blockIndependence,
+                    int version) {
+            this.presetDictionary = presetDictionary;
+            this.reserved1 = reserved1;
+            this.contentChecksum = contentChecksum;
+            this.contentSize = contentSize;
+            this.blockChecksum = blockChecksum;
+            this.blockIndependence = blockIndependence;
+            this.version = version;
+            validate();
+        }
+
+        public static FLG fromByte(byte flg) {
+            int presetDictionary = (flg >>> 0) & 1;
+            int reserved1 = (flg >>> 1) & 1;
+            int contentChecksum = (flg >>> 2) & 1;
+            int contentSize = (flg >>> 3) & 1;
+            int blockChecksum = (flg >>> 4) & 1;
+            int blockIndependence = (flg >>> 5) & 1;
+            int version = (flg >>> 6) & 3;
+
+            return new FLG(presetDictionary,
+                           reserved1,
+                           contentChecksum,
+                           contentSize,
+                           blockChecksum,
+                           blockIndependence,
+                           version);
+        }
+
+        public byte toByte() {
+            return (byte) (((presetDictionary & 1) << 0) | ((reserved1 & 1) << 1) | ((contentChecksum & 1) << 2)
+                    | ((contentSize & 1) << 3) | ((blockChecksum & 1) << 4) | ((blockIndependence & 1) << 5) | ((version & 3) << 6));
+        }
+
+        private void validate() {
+            if (presetDictionary != 0) {
+                throw new RuntimeException("Preset dictionary is unsupported");
+            }
+            if (reserved1 != 0) {
+                throw new RuntimeException("Reserved1 field must be 0");
+            }
+            if (contentChecksum != 0) {
+                throw new RuntimeException("Content checksum is unsupported");
+            }
+            if (contentSize != 0) {
+                throw new RuntimeException("Content size is unsupported");
+            }
+            if (blockIndependence != 1) {
+                throw new RuntimeException("Dependent block stream is unsupported");
+            }
+            if (version != VERSION) {
+                throw new RuntimeException(String.format("Version %d is unsupported", version));
+            }
+        }
+
+        public boolean isPresetDictionarySet() {
+            return presetDictionary == 1;
+        }
+
+        public boolean isContentChecksumSet() {
+            return contentChecksum == 1;
+        }
+
+        public boolean isContentSizeSet() {
+            return contentSize == 1;
+        }
+
+        public boolean isBlockChecksumSet() {
+            return blockChecksum == 1;
+        }
+
+        public boolean isBlockIndependenceSet() {
+            return blockIndependence == 1;
+        }
+
+        public int getVersion() {
+            return version;
+        }
+    }
+
+    public static class BD {
+
+        private final int reserved2;
+        private final int blockSizeValue;
+        private final int reserved3;
+
+        public BD() {
+            this(0, BLOCKSIZE_64KB, 0);
+        }
+
+        public BD(int blockSizeValue) {
+            this(0, blockSizeValue, 0);
+        }
+
+        private BD(int reserved2, int blockSizeValue, int reserved3) {
+            this.reserved2 = reserved2;
+            this.blockSizeValue = blockSizeValue;
+            this.reserved3 = reserved3;
+            validate();
+        }
+
+        public static BD fromByte(byte bd) {
+            int reserved2 = (bd >>> 0) & 15;
+            int blockMaximumSize = (bd >>> 4) & 7;
+            int reserved3 = (bd >>> 7) & 1;
+
+            return new BD(reserved2, blockMaximumSize, reserved3);
+        }
+
+        private void validate() {
+            if (reserved2 != 0) {
+                throw new RuntimeException("Reserved2 field must be 0");
+            }
+            if (blockSizeValue < 4 || blockSizeValue > 7) {
+                throw new RuntimeException("Block size value must be between 4 and 7");
+            }
+            if (reserved3 != 0) {
+                throw new RuntimeException("Reserved3 field must be 0");
+            }
+        }
+
+        // 2^(2n+8)
+        public int getBlockMaximumSize() {
+            return 1 << ((2 * blockSizeValue) + 8);
+        }
+
+        public byte toByte() {
+            return (byte) (((reserved2 & 15) << 0) | ((blockSizeValue & 7) << 4) | ((reserved3 & 1) << 7));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c6d5bba/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index cc4084f..083e7a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -164,21 +164,21 @@ public class MemoryRecords implements Records {
     
     @Override
     public String toString() {
-      Iterator<LogEntry> iter = iterator();
-      StringBuilder builder = new StringBuilder();
-      builder.append('[');
-      while(iter.hasNext()) {
-        LogEntry entry = iter.next();
-        builder.append('(');
-        builder.append("offset=");
-        builder.append(entry.offset());
-        builder.append(",");
-        builder.append("record=");
-        builder.append(entry.record());
-        builder.append(")");
-      }
-      builder.append(']');
-      return builder.toString();
+        Iterator<LogEntry> iter = iterator();
+        StringBuilder builder = new StringBuilder();
+        builder.append('[');
+        while (iter.hasNext()) {
+            LogEntry entry = iter.next();
+            builder.append('(');
+            builder.append("offset=");
+            builder.append(entry.offset());
+            builder.append(",");
+            builder.append("record=");
+            builder.append(entry.record());
+            builder.append(")");
+        }
+        builder.append(']');
+        return builder.toString();
     }
 
     public static class RecordsIterator extends AbstractIterator<LogEntry> {
@@ -218,8 +218,8 @@ public class MemoryRecords implements Records {
                     if (type == CompressionType.NONE) {
                         rec = buffer.slice();
                         int newPos = buffer.position() + size;
-                        if(newPos > buffer.limit())
-                          return allDone();
+                        if (newPos > buffer.limit())
+                            return allDone();
                         buffer.position(newPos);
                         rec.limit(size);
                     } else {
@@ -251,7 +251,7 @@ public class MemoryRecords implements Records {
         }
 
         private boolean innerDone() {
-            return (innerIter == null || !innerIter.hasNext());
+            return innerIter == null || !innerIter.hasNext();
         }
     }
 }