You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2016/02/16 20:29:32 UTC

kafka git commit: KAFKA-3088; Make client-id a nullable string and fix handling of invalid requests.

Repository: kafka
Updated Branches:
  refs/heads/0.9.0 23c69d62a -> bb643f83a


KAFKA-3088; Make client-id a nullable string and fix handling of invalid requests.

…ith empty client ID

- Adds  NULLABLE_STRING Type to the protocol
- Changes client_id in the REQUEST_HEADER to NULLABLE_STRING with a default of ""
- Fixes server handling of invalid ApiKey request and other invalid requests

Specifically for 0.9 branch:
- Changes legacy 'readFrom' methods to default client id to "" on read

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Joel Koshy <jj...@gmail.com>

Closes #910 from granthenke/null-clientid-0.9


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bb643f83
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bb643f83
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bb643f83

Branch: refs/heads/0.9.0
Commit: bb643f83a95e9ebb3a9f8331fe9998c8722c07d4
Parents: 23c69d6
Author: Grant Henke <gr...@gmail.com>
Authored: Tue Feb 16 11:29:03 2016 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Feb 16 11:29:03 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  |   6 +-
 .../kafka/common/protocol/types/Type.java       |  56 ++++++
 .../types/ProtocolSerializationTest.java        |  15 +-
 .../common/requests/RequestResponseTest.java    |  12 ++
 .../src/main/scala/kafka/api/FetchRequest.scala |   4 +-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |   8 +-
 .../scala/kafka/api/OffsetCommitRequest.scala   |   6 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |   8 +-
 .../main/scala/kafka/api/OffsetRequest.scala    |   6 +-
 .../main/scala/kafka/api/ProducerRequest.scala  |   4 +-
 .../scala/kafka/api/StopReplicaRequest.scala    |   4 +-
 .../scala/kafka/api/TopicMetadataRequest.scala  |   2 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |   6 +-
 .../kafka/network/InvalidRequestException.scala |  10 +-
 .../scala/kafka/network/RequestChannel.scala    |   7 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala | 171 +++++++++++++++++++
 16 files changed, 290 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index ff844e7..48c64c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -26,6 +26,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
 import static org.apache.kafka.common.protocol.types.Type.INT8;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
 
 public class Protocol {
 
@@ -35,8 +36,9 @@ public class Protocol {
                                                                      INT32,
                                                                      "A user-supplied integer value that will be passed back with the response"),
                                                            new Field("client_id",
-                                                                     STRING,
-                                                                     "A user specified identifier for the client making the request."));
+                                                                     NULLABLE_STRING,
+                                                                     "A user specified identifier for the client making the request.",
+                                                                     ""));
 
     public static final Schema RESPONSE_HEADER = new Schema(new Field("correlation_id",
                                                                       INT32,

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index 0483387..c4bcb1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -216,6 +216,62 @@ public abstract class Type {
         }
     };
 
+    public static final Type NULLABLE_STRING = new Type() {
+        @Override
+        public boolean isNullable() {
+            return true;
+        }
+
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            if (o == null) {
+                buffer.putShort((short) -1);
+                return;
+            }
+
+            byte[] bytes = Utils.utf8((String) o);
+            if (bytes.length > Short.MAX_VALUE)
+                throw new SchemaException("String is longer than the maximum string length.");
+            buffer.putShort((short) bytes.length);
+            buffer.put(bytes);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            int length = buffer.getShort();
+            if (length < 0)
+                return null;
+
+            byte[] bytes = new byte[length];
+            buffer.get(bytes);
+            return Utils.utf8(bytes);
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            if (o == null)
+                return 2;
+
+            return 2 + Utils.utf8Length((String) o);
+        }
+
+        @Override
+        public String toString() {
+            return "NULLABLE_STRING";
+        }
+
+        @Override
+        public String validate(Object item) {
+            if (item == null)
+                return null;
+
+            if (item instanceof String)
+                return (String) item;
+            else
+                throw new SchemaException(item + " is not a String.");
+        }
+    };
+
     public static final Type BYTES = new Type() {
         @Override
         public void write(ByteBuffer buffer, Object o) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
index 9fe20c1..e20aa10 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java
@@ -38,6 +38,7 @@ public class ProtocolSerializationTest {
                                  new Field("int32", Type.INT32),
                                  new Field("int64", Type.INT64),
                                  new Field("string", Type.STRING),
+                                 new Field("nullable_string", Type.NULLABLE_STRING),
                                  new Field("bytes", Type.BYTES),
                                  new Field("nullable_bytes", Type.NULLABLE_BYTES),
                                  new Field("array", new ArrayOf(Type.INT32)),
@@ -47,6 +48,7 @@ public class ProtocolSerializationTest {
                                              .set("int32", 1)
                                              .set("int64", 1L)
                                              .set("string", "1")
+                                             .set("nullable_string", null)
                                              .set("bytes", ByteBuffer.wrap("1".getBytes()))
                                              .set("nullable_bytes", null)
                                              .set("array", new Object[] {1});
@@ -62,6 +64,9 @@ public class ProtocolSerializationTest {
         check(Type.STRING, "");
         check(Type.STRING, "hello");
         check(Type.STRING, "A\u00ea\u00f1\u00fcC");
+        check(Type.NULLABLE_STRING, null);
+        check(Type.NULLABLE_STRING, "");
+        check(Type.NULLABLE_STRING, "hello");
         check(Type.BYTES, ByteBuffer.allocate(0));
         check(Type.BYTES, ByteBuffer.wrap("abcd".getBytes()));
         check(Type.NULLABLE_BYTES, null);
@@ -99,11 +104,15 @@ public class ProtocolSerializationTest {
 
     @Test
     public void testNullableDefault() {
+        checkNullableDefault(Type.NULLABLE_BYTES, ByteBuffer.allocate(0));
+        checkNullableDefault(Type.NULLABLE_STRING, "default");
+    }
+
+    private void checkNullableDefault(Type type, Object defaultValue) {
         // Should use default even if the field allows null values
-        ByteBuffer empty = ByteBuffer.allocate(0);
-        Schema schema = new Schema(new Field("field", Type.NULLABLE_BYTES, "doc", empty));
+        Schema schema = new Schema(new Field("field", type, "doc", defaultValue));
         Struct struct = new Struct(schema);
-        assertEquals("Should get the default value", empty, struct.get("field"));
+        assertEquals("Should get the default value", defaultValue, struct.get("field"));
         struct.validate(); // should be valid even with missing value
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 69431a5..2b0bb28 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -159,6 +159,18 @@ public class RequestResponseTest {
         assertEquals(response.partitionsRemaining(), deserialized.partitionsRemaining());
     }
 
+    @Test
+    public void testRequestHeaderWithNullClientId() {
+        RequestHeader header = new RequestHeader((short) 10, (short) 1, null, 10);
+        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf());
+        header.writeTo(buffer);
+        buffer.rewind();
+        RequestHeader deserialized = RequestHeader.parse(buffer);
+        assertEquals(header.apiKey(), deserialized.apiKey());
+        assertEquals(header.apiVersion(), deserialized.apiVersion());
+        assertEquals(header.correlationId(), deserialized.correlationId());
+        assertEquals("", deserialized.clientId()); // null is defaulted to ""
+    }
 
     private AbstractRequestResponse createRequestHeader() {
         return new RequestHeader((short) 10, (short) 1, "", 10);

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index 36e288f..04ca157 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -39,7 +39,7 @@ object FetchRequest {
   def readFrom(buffer: ByteBuffer): FetchRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
     val replicaId = buffer.getInt
     val maxWait = buffer.getInt
     val minBytes = buffer.getInt

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index c2584e0..86c8e91 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -88,7 +88,7 @@ case class PartitionStateInfo(leaderIsrAndControllerEpoch: LeaderIsrAndControlle
       allReplicas.size * 4
     size
   }
-  
+
   override def toString(): String = {
     val partitionStateInfo = new StringBuilder
     partitionStateInfo.append("(LeaderAndIsrInfo:" + leaderIsrAndControllerEpoch.toString)
@@ -107,7 +107,7 @@ object LeaderAndIsrRequest {
   def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
     val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
@@ -164,7 +164,7 @@ case class LeaderAndIsrRequest (versionId: Short,
   def sizeInBytes(): Int = {
     var size =
       2 /* version id */ +
-      4 /* correlation id */ + 
+      4 /* correlation id */ +
       (2 + clientId.length) /* client id */ +
       4 /* controller id */ +
       4 /* controller epoch */ +

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 75067cf..26a582c 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -38,9 +38,9 @@ object OffsetCommitRequest extends Logging {
            "Version " + versionId + " is invalid for OffsetCommitRequest. Valid versions are 0, 1 or 2.")
 
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
 
-    // Read the OffsetRequest 
+    // Read the OffsetRequest
     val groupId = readShortString(buffer)
 
     // version 1 and 2 specific fields

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index a83e147..a906851 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -33,7 +33,7 @@ object OffsetFetchRequest extends Logging {
     // Read values from the envelope
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
 
     // Read the OffsetFetchRequest
     val consumerGroupId = readShortString(buffer)
@@ -58,7 +58,7 @@ case class OffsetFetchRequest(groupId: String,
     extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
-  
+
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
     buffer.putShort(versionId)
@@ -81,7 +81,7 @@ case class OffsetFetchRequest(groupId: String,
     2 + /* versionId */
     4 + /* correlationId */
     shortStringLength(clientId) +
-    shortStringLength(groupId) + 
+    shortStringLength(groupId) +
     4 + /* topic count */
     requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
       count + shortStringLength(t._1) + /* topic */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetRequest.scala b/core/src/main/scala/kafka/api/OffsetRequest.scala
index d2c1c95..6d7ef12 100644
--- a/core/src/main/scala/kafka/api/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -37,7 +37,7 @@ object OffsetRequest {
   def readFrom(buffer: ByteBuffer): OffsetRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
     val replicaId = buffer.getInt
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
@@ -132,4 +132,4 @@ case class OffsetRequest(requestInfo: Map[TopicAndPartition, PartitionOffsetRequ
       offsetRequest.append("; RequestInfo: " + requestInfo.mkString(","))
     offsetRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/ProducerRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ProducerRequest.scala b/core/src/main/scala/kafka/api/ProducerRequest.scala
index 7fb143e..ce78f78 100644
--- a/core/src/main/scala/kafka/api/ProducerRequest.scala
+++ b/core/src/main/scala/kafka/api/ProducerRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -31,7 +31,7 @@ object ProducerRequest {
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort
     val correlationId: Int = buffer.getInt
-    val clientId: String = readShortString(buffer)
+    val clientId: String = Option(readShortString(buffer)).getOrElse("")
     val requiredAcks: Short = buffer.getShort
     val ackTimeoutMs: Int = buffer.getInt
     //build the topic structure

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 4441fc6..42aa5a7 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -34,7 +34,7 @@ object StopReplicaRequest extends Logging {
   def readFrom(buffer: ByteBuffer): StopReplicaRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
     val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val deletePartitions = buffer.get match {
@@ -122,4 +122,4 @@ case class StopReplicaRequest(versionId: Short,
       stopReplicaRequest.append("; Partitions: " + partitions.mkString(","))
     stopReplicaRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
index 401c583..1b9ac90 100644
--- a/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
@@ -39,7 +39,7 @@ object TopicMetadataRequest extends Logging {
   def readFrom(buffer: ByteBuffer): TopicMetadataRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
     val numTopics = readIntInRange(buffer, "number of topics", (0, Int.MaxValue))
     val topics = new ListBuffer[String]()
     for(i <- 0 until numTopics)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 11c32cd..1acfd09 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -36,7 +36,7 @@ object UpdateMetadataRequest {
   def readFrom(buffer: ByteBuffer): UpdateMetadataRequest = {
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
-    val clientId = readShortString(buffer)
+    val clientId = Option(readShortString(buffer)).getOrElse("")
     val controllerId = buffer.getInt
     val controllerEpoch = buffer.getInt
     val partitionStateInfosCount = buffer.getInt
@@ -144,4 +144,4 @@ case class UpdateMetadataRequest (versionId: Short,
       updateMetadataRequest.append(";PartitionState:" + partitionStateInfos.mkString(","))
     updateMetadataRequest.toString()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/network/InvalidRequestException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/InvalidRequestException.scala b/core/src/main/scala/kafka/network/InvalidRequestException.scala
index 5197913..47dba6c 100644
--- a/core/src/main/scala/kafka/network/InvalidRequestException.scala
+++ b/core/src/main/scala/kafka/network/InvalidRequestException.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -17,8 +17,8 @@
 
 package kafka.network
 
-class InvalidRequestException(val message: String) extends RuntimeException(message) {
-  
-  def this() = this("")
-  
+class InvalidRequestException(val message: String, cause: Throwable) extends RuntimeException(message, cause) {
+
+  def this() = this("", null)
+  def this(message: String) = this(message, null)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 4044f62..5c7cf49 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -76,7 +76,12 @@ object RequestChannel extends Logging {
         null
     val body: AbstractRequest =
       if (requestObj == null)
-        AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
+        try {
+          AbstractRequest.getRequest(header.apiKey, header.apiVersion, buffer)
+        } catch {
+          case ex: Throwable =>
+            throw new InvalidRequestException(s"Error getting request for apiKey: ${header.apiKey} and apiVersion: ${header.apiVersion}", ex)
+        }
       else
         null
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bb643f83/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
new file mode 100755
index 0000000..155eea0
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala
@@ -0,0 +1,171 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+
+import kafka.integration.KafkaServerTestHarness
+
+import kafka.network.SocketServer
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.types.Type
+import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
+import org.apache.kafka.common.requests.{ProduceResponse, ResponseHeader, ProduceRequest}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class EdgeCaseRequestTest extends KafkaServerTestHarness {
+
+  def generateConfigs() = {
+    val props = TestUtils.createBrokerConfig(1, zkConnect)
+    props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false")
+    List(KafkaConfig.fromProps(props))
+  }
+
+  private def socketServer = servers.head.socketServer
+
+  private def connect(s: SocketServer = socketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): Socket = {
+    new Socket("localhost", s.boundPort(protocol))
+  }
+
+  private def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None) {
+    val outgoing = new DataOutputStream(socket.getOutputStream)
+    id match {
+      case Some(id) =>
+        outgoing.writeInt(request.length + 2)
+        outgoing.writeShort(id)
+      case None =>
+        outgoing.writeInt(request.length)
+    }
+    outgoing.write(request)
+    outgoing.flush()
+  }
+
+  private def receiveResponse(socket: Socket): Array[Byte] = {
+    val incoming = new DataInputStream(socket.getInputStream)
+    val len = incoming.readInt()
+    val response = new Array[Byte](len)
+    incoming.readFully(response)
+    response
+  }
+
+  private def requestAndReceive(request: Array[Byte], id: Option[Short] = None): Array[Byte] = {
+    val plainSocket = connect()
+    try {
+      sendRequest(plainSocket, request, id)
+      receiveResponse(plainSocket)
+    } finally {
+      plainSocket.close()
+    }
+  }
+
+  // Custom header serialization so that protocol assumptions are not forced
+  private def requestHeaderBytes(apiKey: Short, apiVersion: Short, clientId: String = "", correlationId: Int = -1): Array[Byte] = {
+    val size = {
+      2 /* apiKey */ +
+        2 /* version id */ +
+        4 /* correlation id */ +
+        Type.NULLABLE_STRING.sizeOf(clientId) /* client id */
+    }
+
+    val buffer = ByteBuffer.allocate(size)
+    buffer.putShort(apiKey)
+    buffer.putShort(apiVersion)
+    buffer.putInt(correlationId)
+    Type.NULLABLE_STRING.write(buffer, clientId)
+    buffer.array()
+  }
+
+  private def verifyDisconnect(request: Array[Byte]) {
+    val plainSocket = connect()
+    try {
+      sendRequest(plainSocket, requestHeaderBytes(-1, 0))
+      assertEquals("The server should disconnect", -1, plainSocket.getInputStream.read())
+    } finally {
+      plainSocket.close()
+    }
+  }
+
+  @Test
+  def testProduceRequestWithNullClientId() {
+    val topic = "topic"
+    val topicPartition = new TopicPartition(topic, 0)
+    val correlationId = -1
+    TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers)
+
+    val serializedBytes = {
+      val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, 1, null, correlationId)
+      val messageBytes = "message".getBytes
+      val request = new ProduceRequest(1, 10000, Map(topicPartition -> ByteBuffer.wrap(messageBytes)).asJava)
+      val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.sizeOf)
+      byteBuffer.put(headerBytes)
+      request.writeTo(byteBuffer)
+      byteBuffer.array()
+    }
+
+    val response = requestAndReceive(serializedBytes)
+
+    val responseBuffer = ByteBuffer.wrap(response)
+    val responseHeader = ResponseHeader.parse(responseBuffer)
+    val produceResponse = ProduceResponse.parse(responseBuffer)
+
+    assertEquals("The response should parse completely", 0, responseBuffer.remaining())
+    assertEquals("The correlationId should match request", correlationId, responseHeader.correlationId())
+    assertEquals("One partition response should be returned", 1, produceResponse.responses().size())
+
+    val partitionResponse = produceResponse.responses().get(topicPartition)
+    assertNotNull(partitionResponse)
+    assertEquals("There should be no error", 0, partitionResponse.errorCode)
+  }
+
+  @Test
+  def testHeaderOnlyRequest() {
+    verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, 1))
+  }
+
+  @Test
+  def testInvalidApiKeyRequest() {
+    verifyDisconnect(requestHeaderBytes(-1, 0))
+  }
+
+  @Test
+  def testInvalidApiVersionRequest() {
+    verifyDisconnect(requestHeaderBytes(ApiKeys.PRODUCE.id, -1))
+  }
+
+  @Test
+  def testMalformedHeaderRequest() {
+    val serializedBytes = {
+      // Only send apiKey and apiVersion
+      val buffer = ByteBuffer.allocate(
+        2 /* apiKey */ +
+          2 /* apiVersion */
+      )
+      buffer.putShort(ApiKeys.PRODUCE.id)
+      buffer.putShort(1)
+      buffer.array()
+    }
+
+    verifyDisconnect(serializedBytes)
+  }
+}