You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/04/11 08:31:11 UTC

[1/2] kafka git commit: KAFKA-5043; Rename GroupCoordinator to FindCoordinator (KIP-98)

Repository: kafka
Updated Branches:
  refs/heads/trunk 749e9e14c -> d0e7c6b93


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 2a1f368..8b582ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -39,7 +39,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
@@ -178,19 +178,19 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
+        // COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
         time.sleep(sessionTimeoutMs);
         RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE));
+        client.prepareResponse(heartbeatResponse(Errors.COORDINATOR_NOT_AVAILABLE));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
         assertTrue(future.isDone());
         assertTrue(future.failed());
-        assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
+        assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
         assertTrue(coordinator.coordinatorUnknown());
     }
 
@@ -205,13 +205,13 @@ public class ConsumerCoordinatorTest {
         assertEquals(1, consumerClient.pendingRequestCount());
         assertFalse(future.isDone());
 
-        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
+        client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR));
         time.sleep(sessionTimeoutMs);
         consumerClient.poll(0);
 
         assertTrue(future.isDone());
         assertTrue(future.failed());
-        assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), future.exception());
+        assertEquals(Errors.NOT_COORDINATOR.exception(), future.exception());
         assertTrue(coordinator.coordinatorUnknown());
     }
 
@@ -1121,7 +1121,7 @@ public class ConsumerCoordinatorTest {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
@@ -1135,7 +1135,7 @@ public class ConsumerCoordinatorTest {
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
@@ -1151,7 +1151,7 @@ public class ConsumerCoordinatorTest {
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR)));
         coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
@@ -1182,7 +1182,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR_FOR_GROUP)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR)));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
@@ -1194,7 +1194,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)));
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
         coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
@@ -1307,7 +1307,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(Errors.GROUP_LOAD_IN_PROGRESS));
+        client.prepareResponse(offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());
@@ -1348,7 +1348,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(singleton(t1p));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
+        client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         client.prepareResponse(offsetFetchResponse(t1p, Errors.NONE, "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
@@ -1414,7 +1414,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
     }
@@ -1422,14 +1422,14 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
     }
 
     @Test
     public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP);
+        makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
     }
@@ -1437,14 +1437,14 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true);
-        makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         closeVerifyTimeout(coordinator, 1000, 60000, 0, 0);
     }
 
     @Test
     public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000);
     }
@@ -1452,7 +1452,7 @@ public class ConsumerCoordinatorTest {
     @Test
     public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
         ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true);
-        makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
+        makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
         time.sleep(autoCommitIntervalMs);
         closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000);
     }
@@ -1608,8 +1608,8 @@ public class ConsumerCoordinatorTest {
                 leaveGroup);
     }
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new GroupCoordinatorResponse(error, node);
+    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+        return new FindCoordinatorResponse(error, node);
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2e1a79d..e41c38e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -18,7 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -63,9 +63,12 @@ public class RequestResponseTest {
 
     @Test
     public void testSerialization() throws Exception {
-        checkRequest(createGroupCoordinatorRequest());
-        checkErrorResponse(createGroupCoordinatorRequest(), new UnknownServerException());
-        checkResponse(createGroupCoordinatorResponse(), 0);
+        checkRequest(createFindCoordinatorRequest(0));
+        checkRequest(createFindCoordinatorRequest(1));
+        checkErrorResponse(createFindCoordinatorRequest(0), new UnknownServerException());
+        checkErrorResponse(createFindCoordinatorRequest(1), new UnknownServerException());
+        checkResponse(createFindCoordinatorResponse(), 0);
+        checkResponse(createFindCoordinatorResponse(), 1);
         checkRequest(createControlledShutdownRequest());
         checkResponse(createControlledShutdownResponse(), 1);
         checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException());
@@ -100,7 +103,7 @@ public class RequestResponseTest {
         checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
         checkResponse(createOffsetCommitResponse(), 0);
         checkRequest(OffsetFetchRequest.forAllPartitions("group1"));
-        checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorForGroupException());
+        checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator"));
         checkRequest(createOffsetFetchRequest(0));
         checkRequest(createOffsetFetchRequest(1));
         checkRequest(createOffsetFetchRequest(2));
@@ -262,6 +265,12 @@ public class RequestResponseTest {
         return buffer;
     }
 
+    @Test(expected = UnsupportedVersionException.class)
+    public void cannotUseFindCoordinatorV0ToFindTransactionCoordinator() {
+        FindCoordinatorRequest.Builder builder = new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.TRANSACTION, "foobar");
+        builder.build((short) 0);
+    }
+
     @Test
     public void produceRequestToStringTest() {
         ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
@@ -495,12 +504,13 @@ public class RequestResponseTest {
         return new ResponseHeader(10);
     }
 
-    private GroupCoordinatorRequest createGroupCoordinatorRequest() {
-        return new GroupCoordinatorRequest.Builder("test-group").build();
+    private FindCoordinatorRequest createFindCoordinatorRequest(int version) {
+        return new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, "test-group")
+                .build((short) version);
     }
 
-    private GroupCoordinatorResponse createGroupCoordinatorResponse() {
-        return new GroupCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
+    private FindCoordinatorResponse createFindCoordinatorResponse() {
+        return new FindCoordinatorResponse(Errors.NONE, new Node(10, "host1", 2014));
     }
 
     private FetchRequest createFetchRequest(int version) {
@@ -827,7 +837,7 @@ public class RequestResponseTest {
     }
 
     private InitPidRequest createInitPidRequest() {
-        return new InitPidRequest.Builder(null).build();
+        return new InitPidRequest.Builder(null, 100).build();
     }
 
     private InitPidResponse createInitPidResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 977cc21..ab042de 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest.ProtocolMetadata;
 import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
@@ -459,8 +459,8 @@ public class WorkerCoordinatorTest {
     }
 
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new GroupCoordinatorResponse(error, node);
+    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+        return new FindCoordinatorResponse(error, node);
     }
 
     private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
index a837e66..f734032 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -384,7 +384,7 @@ public class KafkaBasedLogTest {
                 consumer.schedulePollTask(new Runnable() {
                     @Override
                     public void run() {
-                        consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
+                        consumer.setException(Errors.COORDINATOR_NOT_AVAILABLE.exception());
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 8a9660b..4d218c1 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -100,15 +100,15 @@ class AdminClient(val time: Time,
 
   def findCoordinator(groupId: String, timeoutMs: Long = 0): Node = {
     val startTime = time.milliseconds
-    val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
-    var response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+    val requestBuilder = new FindCoordinatorRequest.Builder(org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType.GROUP, groupId)
+    var response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
 
-    while (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
+    while (response.error == Errors.COORDINATOR_NOT_AVAILABLE && time.milliseconds - startTime < timeoutMs) {
       Thread.sleep(retryBackoffMs)
-      response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
+      response = sendAnyNode(ApiKeys.FIND_COORDINATOR, requestBuilder).asInstanceOf[FindCoordinatorResponse]
     }
 
-    if (response.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+    if (response.error == Errors.COORDINATOR_NOT_AVAILABLE)
       throw new TimeoutException("The consumer group command timed out while waiting for group to initialize: ", response.error.exception)
 
     response.error.maybeThrow()

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
index 2082e44..d99474d 100644
--- a/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
+++ b/core/src/main/scala/kafka/api/GroupCoordinatorRequest.scala
@@ -44,7 +44,7 @@ case class GroupCoordinatorRequest(group: String,
                                    versionId: Short = GroupCoordinatorRequest.CurrentVersion,
                                    correlationId: Int = 0,
                                    clientId: String = GroupCoordinatorRequest.DefaultClientId)
-  extends RequestOrResponse(Some(ApiKeys.GROUP_COORDINATOR.id)) {
+  extends RequestOrResponse(Some(ApiKeys.FIND_COORDINATOR.id)) {
 
   def sizeInBytes =
     2 + /* versionId */
@@ -64,7 +64,7 @@ case class GroupCoordinatorRequest(group: String,
 
   override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     // return ConsumerCoordinatorNotAvailable for all uncaught errors
-    val errorResponse = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, correlationId)
+    val errorResponse = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, correlationId)
     requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index e0aa46d..2cf9bb4 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -63,10 +63,10 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors
 
 object OffsetMetadataAndError {
   val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE)
-  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS)
+  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_LOAD_IN_PROGRESS)
   val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID)
-  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP)
-  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR)
+  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_NOT_AVAILABLE)
   val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION)
   val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c5ad94a..b810f81 100755
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -370,8 +370,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                       (error != Errors.NONE && error != Errors.OFFSET_METADATA_TOO_LARGE),
 
                     folded._3 || // update shouldRefreshCoordinator
-                      error == Errors.NOT_COORDINATOR_FOR_GROUP ||
-                      error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE,
+                      error == Errors.NOT_COORDINATOR ||
+                      error == Errors.COORDINATOR_NOT_AVAILABLE,
 
                     // update error count
                     folded._4 + (if (error != Errors.NONE) 1 else 0))
@@ -444,8 +444,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
             val (leaderChanged, loadInProgress) =
               offsetFetchResponse.requestInfo.values.foldLeft(false, false) { case (folded, offsetMetadataAndError) =>
-                (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR_FOR_GROUP),
-                 folded._2 || (offsetMetadataAndError.error == Errors.GROUP_LOAD_IN_PROGRESS))
+                (folded._1 || (offsetMetadataAndError.error == Errors.NOT_COORDINATOR),
+                 folded._2 || (offsetMetadataAndError.error == Errors.COORDINATOR_LOAD_IN_PROGRESS))
               }
 
             if (leaderChanged) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 70f4724..d78d1df 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -99,13 +99,13 @@ class GroupCoordinator(val brokerId: Int,
                       protocols: List[(String, Array[Byte])],
                       responseCallback: JoinCallback) {
     if (!isActive.get) {
-      responseCallback(joinError(memberId, Errors.GROUP_COORDINATOR_NOT_AVAILABLE))
+      responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
     } else if (!validGroupId(groupId)) {
       responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP))
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS))
+      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
     } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
                sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
       responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
@@ -225,9 +225,9 @@ class GroupCoordinator(val brokerId: Int,
                       groupAssignment: Map[String, Array[Byte]],
                       responseCallback: SyncCallback) {
     if (!isActive.get) {
-      responseCallback(Array.empty, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+      responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP)
+      responseCallback(Array.empty, Errors.NOT_COORDINATOR)
     } else {
       groupManager.getGroup(groupId) match {
         case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
@@ -302,11 +302,11 @@ class GroupCoordinator(val brokerId: Int,
 
   def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) {
     if (!isActive.get) {
-      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(Errors.GROUP_LOAD_IN_PROGRESS)
+      responseCallback(Errors.NOT_COORDINATOR)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(Errors.COORDINATOR_LOAD_IN_PROGRESS)
     } else {
       groupManager.getGroup(groupId) match {
         case None =>
@@ -336,10 +336,10 @@ class GroupCoordinator(val brokerId: Int,
                       generationId: Int,
                       responseCallback: Errors => Unit) {
     if (!isActive.get) {
-      responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
+      responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      responseCallback(Errors.NOT_COORDINATOR)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
       // the group is still loading, so respond just blindly
       responseCallback(Errors.NONE)
     } else {
@@ -399,11 +399,11 @@ class GroupCoordinator(val brokerId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
     if (!isActive.get) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE))
+      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP))
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS))
+      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR))
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS))
     } else {
       groupManager.getGroup(groupId) match {
         case None =>
@@ -457,12 +457,12 @@ class GroupCoordinator(val brokerId: Int,
   def handleFetchOffsets(groupId: String,
                          partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
     if (!isActive.get)
-      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Map())
+      (Errors.COORDINATOR_NOT_AVAILABLE, Map())
     else if (!isCoordinatorForGroup(groupId)) {
       debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
-      (Errors.NOT_COORDINATOR_FOR_GROUP, Map())
-    } else if (isCoordinatorLoadingInProgress(groupId))
-      (Errors.GROUP_LOAD_IN_PROGRESS, Map())
+      (Errors.NOT_COORDINATOR, Map())
+    } else if (isCoordinatorLoadInProgress(groupId))
+      (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
     else {
       // return offsets blindly regardless the current group state since the group may be using
       // Kafka commit storage without automatic group management
@@ -472,20 +472,20 @@ class GroupCoordinator(val brokerId: Int,
 
   def handleListGroups(): (Errors, List[GroupOverview]) = {
     if (!isActive.get) {
-      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
+      (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
     } else {
-      val error = if (groupManager.isLoading()) Errors.GROUP_LOAD_IN_PROGRESS else Errors.NONE
-      (error, groupManager.currentGroups.map(_.overview).toList)
+      val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
+      (errorCode, groupManager.currentGroups.map(_.overview).toList)
     }
   }
 
   def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
     if (!isActive.get) {
-      (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
+      (Errors.COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
     } else if (!isCoordinatorForGroup(groupId)) {
-      (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
-    } else if (isCoordinatorLoadingInProgress(groupId)) {
-      (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
+      (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
+    } else if (isCoordinatorLoadInProgress(groupId)) {
+      (Errors.COORDINATOR_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
     } else {
       groupManager.getGroup(groupId) match {
         case None => (Errors.NONE, GroupCoordinator.DeadGroup)
@@ -512,7 +512,7 @@ class GroupCoordinator(val brokerId: Int,
         case PreparingRebalance =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingJoinCallback != null) {
-              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR_FOR_GROUP))
+              member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
               member.awaitingJoinCallback = null
             }
           }
@@ -521,7 +521,7 @@ class GroupCoordinator(val brokerId: Int,
         case Stable | AwaitingSync =>
           for (member <- group.allMemberMetadata) {
             if (member.awaitingSyncCallback != null) {
-              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR_FOR_GROUP)
+              member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
               member.awaitingSyncCallback = null
             }
             heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
@@ -754,7 +754,7 @@ class GroupCoordinator(val brokerId: Int,
 
   private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
 
-  private def isCoordinatorLoadingInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
+  private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
 }
 
 object GroupCoordinator {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 2bc0c21..97dc7bc 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -184,10 +184,10 @@ class GroupMetadataManager(val brokerId: Int,
               case Errors.UNKNOWN_TOPIC_OR_PARTITION
                    | Errors.NOT_ENOUGH_REPLICAS
                    | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                Errors.COORDINATOR_NOT_AVAILABLE
 
               case Errors.NOT_LEADER_FOR_PARTITION =>
-                Errors.NOT_COORDINATOR_FOR_GROUP
+                Errors.NOT_COORDINATOR
 
               case Errors.REQUEST_TIMED_OUT =>
                 Errors.REBALANCE_IN_PROGRESS
@@ -214,7 +214,7 @@ class GroupMetadataManager(val brokerId: Int,
         Some(DelayedStore(groupMetadataRecords, putCacheCallback))
 
       case None =>
-        responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP)
+        responseCallback(Errors.NOT_COORDINATOR)
         None
     }
   }
@@ -300,10 +300,10 @@ class GroupMetadataManager(val brokerId: Int,
                   case Errors.UNKNOWN_TOPIC_OR_PARTITION
                        | Errors.NOT_ENOUGH_REPLICAS
                        | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
-                    Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+                    Errors.COORDINATOR_NOT_AVAILABLE
 
                   case Errors.NOT_LEADER_FOR_PARTITION =>
-                    Errors.NOT_COORDINATOR_FOR_GROUP
+                    Errors.NOT_COORDINATOR
 
                   case Errors.MESSAGE_TOO_LARGE
                        | Errors.RECORD_LIST_TOO_LARGE
@@ -335,7 +335,7 @@ class GroupMetadataManager(val brokerId: Int,
 
         case None =>
           val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
-            (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP)
+            (topicPartition, Errors.NOT_COORDINATOR)
           }
           responseCallback(commitStatus)
           None

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
index 41b4323..eced9fb 100644
--- a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
@@ -59,7 +59,7 @@ class TransactionCoordinator(val brokerId: Int,
       responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
     } else {
       // check if it is the assigned coordinator for the transactional id
-      responseCallback(initPidError(Errors.NOT_COORDINATOR_FOR_GROUP))
+      responseCallback(initPidError(Errors.NOT_COORDINATOR))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c75e1b9..e6e2d4d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -35,7 +35,7 @@ import kafka.network.RequestChannel.{Request, Response, Session}
 import kafka.security.auth
 import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, Group, Operation, Read, Resource, Write}
 import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils}
-import org.apache.kafka.common.errors.{ClusterAuthorizationException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
+import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
@@ -90,7 +90,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
         case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
         case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
-        case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
+        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
         case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
         case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
         case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
@@ -796,7 +796,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " +
         s"'${KafkaConfig.OffsetsTopicReplicationFactorProp}'). This error can be ignored if the cluster is starting up " +
         s"and not all brokers are up yet.")
-      new MetadataResponse.TopicMetadata(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Topic.GroupMetadataTopicName, true,
+      new MetadataResponse.TopicMetadata(Errors.COORDINATOR_NOT_AVAILABLE, Topic.GroupMetadataTopicName, true,
         java.util.Collections.emptyList())
     } else {
       createTopic(Topic.GroupMetadataTopicName, config.offsetsTopicPartitions,
@@ -818,7 +818,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
         if (topic == Topic.GroupMetadataTopicName) {
           val topicMetadata = createGroupMetadataTopic()
-          if (topicMetadata.error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE) {
+          if (topicMetadata.error == Errors.COORDINATOR_NOT_AVAILABLE) {
             new MetadataResponse.TopicMetadata(Errors.INVALID_REPLICATION_FACTOR, topic, Topic.isInternal(topic),
               java.util.Collections.emptyList())
           } else topicMetadata
@@ -981,34 +981,44 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new Response(request, offsetFetchResponse))
   }
 
-  def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
-    val groupCoordinatorRequest = request.body[GroupCoordinatorRequest]
+  def handleFindCoordinatorRequest(request: RequestChannel.Request) {
+    val findCoordinatorRequest = request.body[FindCoordinatorRequest]
 
-    if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
-      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
+    if (findCoordinatorRequest.coordinatorType == FindCoordinatorRequest.CoordinatorType.GROUP &&
+      !authorize(request.session, Describe, new Resource(Group, findCoordinatorRequest.coordinatorKey))) {
+
+      val responseBody = new FindCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED, Node.noNode)
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     } else {
-      val partition = groupCoordinator.partitionFor(groupCoordinatorRequest.groupId)
+      // TODO: Authorize by transactional id if coordinator type is TRANSACTION
 
       // get metadata (and create the topic if necessary)
-      val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.listenerName)
+      val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match {
+        case FindCoordinatorRequest.CoordinatorType.GROUP =>
+          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey)
+          val metadata = getOrCreateGroupMetadataTopic(request.listenerName)
+          (partition, metadata)
+
+        case _ =>
+          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")
+      }
 
-      val responseBody = if (offsetsTopicMetadata.error != Errors.NONE) {
-        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
+      val responseBody = if (topicMetadata.error != Errors.NONE) {
+        new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
       } else {
-        val coordinatorEndpoint = offsetsTopicMetadata.partitionMetadata().asScala
+        val coordinatorEndpoint = topicMetadata.partitionMetadata.asScala
           .find(_.partition == partition)
           .map(_.leader())
 
         coordinatorEndpoint match {
           case Some(endpoint) if !endpoint.isEmpty =>
-            new GroupCoordinatorResponse(Errors.NONE, endpoint)
+            new FindCoordinatorResponse(Errors.NONE, endpoint)
           case _ =>
-            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode)
+            new FindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
         }
       }
 
-      trace("Sending consumer metadata %s for correlation id %d to client %s."
+      trace("Sending FindCoordinator response %s for correlation id %d to client %s."
         .format(responseBody, request.header.correlationId, request.header.clientId))
       requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ec3eb88..1300629 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -92,7 +92,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.LIST_OFFSETS -> classOf[requests.ListOffsetResponse],
       ApiKeys.OFFSET_COMMIT -> classOf[requests.OffsetCommitResponse],
       ApiKeys.OFFSET_FETCH -> classOf[requests.OffsetFetchResponse],
-      ApiKeys.GROUP_COORDINATOR -> classOf[requests.GroupCoordinatorResponse],
+      ApiKeys.FIND_COORDINATOR -> classOf[FindCoordinatorResponse],
       ApiKeys.UPDATE_METADATA_KEY -> classOf[requests.UpdateMetadataResponse],
       ApiKeys.JOIN_GROUP -> classOf[JoinGroupResponse],
       ApiKeys.SYNC_GROUP -> classOf[SyncGroupResponse],
@@ -113,7 +113,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LIST_OFFSETS -> ((resp: requests.ListOffsetResponse) => resp.responseData().asScala.find(_._1 == tp).get._2.error),
     ApiKeys.OFFSET_COMMIT -> ((resp: requests.OffsetCommitResponse) => resp.responseData().asScala.find(_._1 == tp).get._2),
     ApiKeys.OFFSET_FETCH -> ((resp: requests.OffsetFetchResponse) => resp.error),
-    ApiKeys.GROUP_COORDINATOR -> ((resp: requests.GroupCoordinatorResponse) => resp.error),
+    ApiKeys.FIND_COORDINATOR -> ((resp: FindCoordinatorResponse) => resp.error),
     ApiKeys.UPDATE_METADATA_KEY -> ((resp: requests.UpdateMetadataResponse) => resp.error),
     ApiKeys.JOIN_GROUP -> ((resp: JoinGroupResponse) => resp.error),
     ApiKeys.SYNC_GROUP -> ((resp: SyncGroupResponse) => resp.error),
@@ -134,7 +134,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.LIST_OFFSETS -> TopicDescribeAcl,
     ApiKeys.OFFSET_COMMIT -> (TopicReadAcl ++ GroupReadAcl),
     ApiKeys.OFFSET_FETCH -> (TopicReadAcl ++ GroupReadAcl),
-    ApiKeys.GROUP_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl),
+    ApiKeys.FIND_COORDINATOR -> (TopicReadAcl ++ GroupReadAcl),
     ApiKeys.UPDATE_METADATA_KEY -> ClusterAcl,
     ApiKeys.JOIN_GROUP -> GroupReadAcl,
     ApiKeys.SYNC_GROUP -> GroupReadAcl,
@@ -212,8 +212,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     new requests.OffsetFetchRequest.Builder(group, List(tp).asJava).build()
   }
 
-  private def createGroupCoordinatorRequest = {
-    new requests.GroupCoordinatorRequest.Builder(group).build()
+  private def createFindCoordinatorRequest = {
+    new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, group).build()
   }
 
   private def createUpdateMetadataRequest = {
@@ -281,7 +281,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.FETCH -> createFetchRequest,
       ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
       ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
-      ApiKeys.GROUP_COORDINATOR -> createGroupCoordinatorRequest,
+      ApiKeys.FIND_COORDINATOR -> createFindCoordinatorRequest,
       ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest,
       ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
       ApiKeys.SYNC_GROUP -> createSyncGroupRequest,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
index 905d113..2bceeaa 100644
--- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
@@ -26,7 +26,6 @@ import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Before
 import org.junit.Test
-
 import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions
 import kafka.admin.ConsumerGroupCommand.KafkaConsumerGroupService
 import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService
@@ -37,9 +36,8 @@ import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 
 import org.apache.kafka.clients.consumer.KafkaConsumer
-import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.errors.WakeupException
+import org.apache.kafka.common.errors.{CoordinatorNotAvailableException, WakeupException}
 import org.apache.kafka.common.serialization.StringDeserializer
 
 
@@ -184,6 +182,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val (state, assignments) = consumerGroupCommand.describeGroup()
     assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List()))
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -203,9 +202,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-      }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
+    }, "Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe group results.")
 
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -218,9 +218,9 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-        val (state, _) = consumerGroupCommand.describeGroup()
-        state == Some("Stable")
-      }, "Expected the group to initially become stable.")
+      val (state, _) = consumerGroupCommand.describeGroup()
+      state == Some("Stable")
+    }, "Expected the group to initially become stable.")
 
     // stop the consumer so the group has no active member anymore
     executor.shutdown()
@@ -233,7 +233,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         assignments.get.filter(_.group == group).head.consumerId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) && // the member should be gone
         assignments.get.filter(_.group == group).head.clientId.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE) &&
         assignments.get.filter(_.group == group).head.host.exists(_.trim == ConsumerGroupCommand.MISSING_COLUMN_VALUE)
-      }, "Expected no active member in describe group results.")
+    }, "Expected no active member in describe group results.")
 
     consumerGroupCommand.close()
   }
@@ -254,9 +254,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
         assignments.get.count(_.group == group) == 2 &&
         assignments.get.count{ x => x.group == group && x.partition.isDefined} == 1 &&
         assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 1
-      }, "Expected rows for consumers with no assigned partitions in describe group results.")
+    }, "Expected rows for consumers with no assigned partitions in describe group results.")
 
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -272,15 +273,16 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     TestUtils.waitUntilTrue(() => {
-          val (state, assignments) = consumerGroupCommand.describeGroup()
-          state == Some("Stable") &&
-          assignments.isDefined &&
-          assignments.get.count(_.group == group) == 2 &&
-          assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
-          assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
-      }, "Expected two rows (one row per consumer) in describe group results.")
+      val (state, assignments) = consumerGroupCommand.describeGroup()
+      state == Some("Stable") &&
+      assignments.isDefined &&
+      assignments.get.count(_.group == group) == 2 &&
+      assignments.get.count{ x => x.group == group && x.partition.isDefined} == 2 &&
+      assignments.get.count{ x => x.group == group && !x.partition.isDefined} == 0
+    }, "Expected two rows (one row per consumer) in describe group results.")
 
     consumerGroupCommand.close()
+    executor.shutdown()
   }
 
   @Test
@@ -294,16 +296,17 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness {
     val consumerGroupCommand = new KafkaConsumerGroupService(opts)
 
     try {
-      val (state, assignments) = consumerGroupCommand.describeGroup()
+      consumerGroupCommand.describeGroup()
       fail("The consumer group command should fail due to low initialization timeout")
     } catch {
-      case e: TimeoutException =>
+      case _: TimeoutException =>
         // OK
       case e: Throwable =>
         fail("An unexpected exception occurred: " + e.getMessage)
         throw e
     } finally {
       consumerGroupCommand.close()
+      executor.shutdown()
     }
   }
 }
@@ -341,7 +344,7 @@ class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String,
   for (i <- 1 to numConsumers) {
     val consumer = new ConsumerThread(broker, i, groupId, topic)
     consumers ++= List(consumer)
-    executor.submit(consumer);
+    executor.submit(consumer)
   }
 
   Runtime.getRuntime().addShutdownHook(new Thread() {
@@ -352,7 +355,7 @@ class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String,
 
   def shutdown() {
     consumers.foreach(_.shutdown)
-    executor.shutdown();
+    executor.shutdown()
     try {
       executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
     } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 5342dac..90b67ba 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -175,7 +175,7 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
   private val consumerMetadataRequest = SerializationTestUtils.createConsumerMetadataRequest
   private val consumerMetadataResponse = SerializationTestUtils.createConsumerMetadataResponse
-  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, Errors.GROUP_COORDINATOR_NOT_AVAILABLE, 0)
+  private val consumerMetadataResponseNoCoordinator = GroupCoordinatorResponse(None, Errors.COORDINATOR_NOT_AVAILABLE, 0)
 
   @Test
   def testSerializationAndDeserialization() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 61199c2..ccbba5c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -114,7 +114,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
     val joinGroupResult = joinGroup(otherGroupId, memberId, protocolType, protocols)
     val joinGroupError = joinGroupResult.error
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, joinGroupError)
+    assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
   }
 
   @Test
@@ -204,7 +204,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testHeartbeatWrongCoordinator() {
 
     val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, heartbeatResult)
+    assertEquals(Errors.NOT_COORDINATOR, heartbeatResult)
   }
 
   @Test
@@ -471,7 +471,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val generation = 1
 
     val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, syncGroupResult._2)
+    assertEquals(Errors.NOT_COORDINATOR, syncGroupResult._2)
   }
 
   @Test
@@ -771,7 +771,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testFetchOffsetNotCoordinatorForGroup(): Unit = {
     val tp = new TopicPartition("topic", 0)
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(otherGroupId, Some(Seq(tp)))
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+    assertEquals(Errors.NOT_COORDINATOR, error)
     assertTrue(partitionData.isEmpty)
   }
 
@@ -864,7 +864,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
 
     val leaveGroupResult = leaveGroup(otherGroupId, memberId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, leaveGroupResult)
+    assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult)
   }
 
   @Test
@@ -937,7 +937,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   def testDescribeGroupWrongCoordinator() {
     EasyMock.reset(replicaManager)
     val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
-    assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP, error)
+    assertEquals(Errors.NOT_COORDINATOR, error)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 9d38485..b1f68bb 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -259,10 +259,10 @@ class GroupMetadataManagerTest {
   @Test
   def testStoreGroupErrorMapping() {
     assertStoreGroupErrorMapping(Errors.NONE, Errors.NONE)
-    assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP)
+    assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
     assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN)
     assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN)
     assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN)
@@ -343,7 +343,7 @@ class GroupMetadataManagerTest {
     }
 
     groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
-    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
+    assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
     EasyMock.verify(replicaManager)
   }
 
@@ -414,16 +414,16 @@ class GroupMetadataManagerTest {
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP), maybeError)
+    assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
     EasyMock.verify(replicaManager)
   }
 
   @Test
   def testCommitOffsetFailure() {
-    assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.GROUP_COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR_FOR_GROUP)
+    assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
+    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
     assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
     assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
     assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE)


[2/2] kafka git commit: KAFKA-5043; Rename GroupCoordinator to FindCoordinator (KIP-98)

Posted by ij...@apache.org.
KAFKA-5043; Rename GroupCoordinator to FindCoordinator (KIP-98)

Also:
1. FindCoordinator is more general and takes a coordinator_type
so that it can be used for the group and transaction coordinators.
2. Include an error message in FindCoordinatorResponse to make the
errors at the client side more informative. We have just added the
field to the protocol in this PR, a subsequent PR will update the
code to use it.
3. Rename `Errors` names for FindCoordinator to be more generic. This
is a compatible change as the ids remain the same.
4. Since the exception classes for the error codes are in a public
package, we introduce new ones and deprecate the old ones.
The classes were not thrown back to the user (KAFKA-5052 aside),
so this is a compatible change.
5. Update InitPidRequest for transactions. Since this protocol API
was introduced recently and is not used by default, we did not bump
its version.

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2825 from apurvam/exactly-once-rpc-stubs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0e7c6b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0e7c6b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0e7c6b9

Branch: refs/heads/trunk
Commit: d0e7c6b9304b23ced046934c799df0cba39c28e5
Parents: 749e9e1
Author: Apurva Mehta <ap...@confluent.io>
Authored: Tue Apr 11 09:05:09 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Apr 11 09:11:46 2017 +0100

----------------------------------------------------------------------
 .../RetriableCommitFailedException.java         |   7 +-
 .../consumer/internals/AbstractCoordinator.java |  36 ++---
 .../consumer/internals/ConsumerCoordinator.java |  15 +-
 .../consumer/internals/RequestFuture.java       |   2 +-
 .../CoordinatorLoadInProgressException.java     |  39 +++++
 .../CoordinatorNotAvailableException.java       |  43 ++++++
 .../GroupCoordinatorNotAvailableException.java  |   5 +-
 .../errors/GroupLoadInProgressException.java    |   5 +-
 .../errors/InvalidTxnTimeoutException.java      |  33 +++++
 .../common/errors/NotCoordinatorException.java  |  38 +++++
 .../errors/NotCoordinatorForGroupException.java |   5 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |   2 +-
 .../apache/kafka/common/protocol/Errors.java    |  24 +--
 .../apache/kafka/common/protocol/Protocol.java  |  57 +++++---
 .../kafka/common/requests/AbstractRequest.java  |   4 +-
 .../kafka/common/requests/AbstractResponse.java |   4 +-
 .../common/requests/DescribeGroupsResponse.java |   6 +-
 .../common/requests/FindCoordinatorRequest.java | 145 +++++++++++++++++++
 .../requests/FindCoordinatorResponse.java       |  95 ++++++++++++
 .../requests/GroupCoordinatorRequest.java       |  89 ------------
 .../requests/GroupCoordinatorResponse.java      |  85 -----------
 .../common/requests/HeartbeatResponse.java      |   2 +-
 .../kafka/common/requests/InitPidRequest.java   |  31 +++-
 .../common/requests/JoinGroupResponse.java      |   4 +-
 .../common/requests/ListGroupsResponse.java     |   2 +-
 .../common/requests/OffsetCommitResponse.java   |   4 +-
 .../common/requests/OffsetFetchResponse.java    |   6 +-
 .../common/requests/SyncGroupResponse.java      |   2 +-
 .../clients/consumer/KafkaConsumerTest.java     |  14 +-
 .../internals/AbstractCoordinatorTest.java      |   6 +-
 .../internals/ConsumerCoordinatorTest.java      |  42 +++---
 .../common/requests/RequestResponseTest.java    |  30 ++--
 .../distributed/WorkerCoordinatorTest.java      |   6 +-
 .../kafka/connect/util/KafkaBasedLogTest.java   |   2 +-
 .../main/scala/kafka/admin/AdminClient.scala    |  10 +-
 .../kafka/api/GroupCoordinatorRequest.scala     |   4 +-
 .../kafka/common/OffsetMetadataAndError.scala   |   6 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   8 +-
 .../kafka/coordinator/GroupCoordinator.scala    |  62 ++++----
 .../coordinator/GroupMetadataManager.scala      |  12 +-
 .../coordinator/TransactionCoordinator.scala    |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  42 ++++--
 .../kafka/api/AuthorizerIntegrationTest.scala   |  12 +-
 .../kafka/admin/DescribeConsumerGroupTest.scala |  43 +++---
 .../api/RequestResponseSerializationTest.scala  |   2 +-
 .../GroupCoordinatorResponseTest.scala          |  12 +-
 .../coordinator/GroupMetadataManagerTest.scala  |  20 +--
 47 files changed, 717 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
index 510362a..69f21a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -22,12 +22,17 @@ public class RetriableCommitFailedException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
 
+    public static RetriableCommitFailedException withUnderlyingMessage(String additionalMessage) {
+        return new RetriableCommitFailedException("Offset commit failed with a retriable exception. " +
+                "You should retry committing offsets. The underlying error was: " + additionalMessage);
+    }
+
     public RetriableCommitFailedException(Throwable t) {
         super("Offset commit failed with a retriable exception. You should retry committing offsets.", t);
     }
 
     public RetriableCommitFailedException(String message) {
-        super("Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: " + message);
+        super(message);
     }
 
     public RetriableCommitFailedException(String message, Throwable t) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index ffafddc..d0de4bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.RetriableException;
@@ -35,8 +35,8 @@ import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.GroupCoordinatorRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -465,7 +465,7 @@ public abstract class AbstractCoordinator implements Closeable {
                         }
                     }
                 }
-            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+            } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                 log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
                         coordinator());
                 // backoff and retry
@@ -475,8 +475,8 @@ public abstract class AbstractCoordinator implements Closeable {
                 resetGeneration();
                 log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
-            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                    || error == Errors.NOT_COORDINATOR) {
                 // re-discover the coordinator and retry with backoff
                 coordinatorDead();
                 log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
@@ -550,8 +550,8 @@ public abstract class AbstractCoordinator implements Closeable {
                     log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                     resetGeneration();
                     future.raise(error);
-                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                        || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                        || error == Errors.NOT_COORDINATOR) {
                     log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                     coordinatorDead();
                     future.raise(error);
@@ -570,8 +570,8 @@ public abstract class AbstractCoordinator implements Closeable {
     private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
         // initiate the group metadata request
         log.debug("Sending GroupCoordinator request for group {} to broker {}", groupId, node);
-        GroupCoordinatorRequest.Builder requestBuilder =
-                new GroupCoordinatorRequest.Builder(this.groupId);
+        FindCoordinatorRequest.Builder requestBuilder =
+                new FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, this.groupId);
         return client.send(node, requestBuilder)
                      .compose(new GroupCoordinatorResponseHandler());
     }
@@ -582,18 +582,18 @@ public abstract class AbstractCoordinator implements Closeable {
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
             log.debug("Received GroupCoordinator response {} for group {}", resp, groupId);
 
-            GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody();
+            FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
             // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
             // for the coordinator in the underlying network client layer
             // TODO: this needs to be better handled in KAFKA-1935
-            Errors error = groupCoordinatorResponse.error();
+            Errors error = findCoordinatorResponse.error();
             clearFindCoordinatorFuture();
             if (error == Errors.NONE) {
                 synchronized (AbstractCoordinator.this) {
                     AbstractCoordinator.this.coordinator = new Node(
-                            Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
-                            groupCoordinatorResponse.node().host(),
-                            groupCoordinatorResponse.node().port());
+                            Integer.MAX_VALUE - findCoordinatorResponse.node().id(),
+                            findCoordinatorResponse.node().host(),
+                            findCoordinatorResponse.node().port());
                     log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
                     client.tryConnect(coordinator);
                     heartbeat.resetTimeouts(time.milliseconds());
@@ -640,7 +640,7 @@ public abstract class AbstractCoordinator implements Closeable {
     protected synchronized void coordinatorDead() {
         if (this.coordinator != null) {
             log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
-            client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
+            client.failUnsentRequests(this.coordinator, CoordinatorNotAvailableException.INSTANCE);
             this.coordinator = null;
         }
     }
@@ -749,8 +749,8 @@ public abstract class AbstractCoordinator implements Closeable {
             if (error == Errors.NONE) {
                 log.debug("Received successful Heartbeat response for group {}", groupId);
                 future.complete(null);
-            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                    || error == Errors.NOT_COORDINATOR) {
                 log.debug("Attempt to heartbeat failed for group {} since coordinator {} is either not started or not valid.",
                         groupId, coordinator());
                 coordinatorDead();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 7d21767..ca2108d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -521,7 +521,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 @Override
                 public void onFailure(RuntimeException e) {
                     pendingAsyncCommits.decrementAndGet();
-                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e.getMessage())));
+                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
+                            RetriableCommitFailedException.withUnderlyingMessage(e.getMessage())));
                 }
             });
         }
@@ -550,7 +551,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 Exception commitException = e;
 
                 if (e instanceof RetriableException)
-                    commitException = new RetriableCommitFailedException(e.getMessage());
+                    commitException = RetriableCommitFailedException.withUnderlyingMessage(e.getMessage());
 
                 completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
             }
@@ -754,13 +755,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
                     future.raise(error);
                     return;
-                } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+                } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
                     log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                     future.raise(error);
                     return;
-                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
-                        || error == Errors.NOT_COORDINATOR_FOR_GROUP
+                } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
+                        || error == Errors.NOT_COORDINATOR
                         || error == Errors.REQUEST_TIMED_OUT) {
                     log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                     coordinatorDead();
@@ -823,10 +824,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 Errors error = response.error();
                 log.debug("Offset fetch for group {} failed: {}", groupId, error.message());
 
-                if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+                if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                     // just retry
                     future.raise(error);
-                } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+                } else if (error == Errors.NOT_COORDINATOR) {
                     // re-discover the coordinator and retry
                     coordinatorDead();
                     future.raise(error);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
index 8515c95..f7e8ca1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
@@ -239,7 +239,7 @@ public class RequestFuture<T> implements ConsumerNetworkClient.PollCondition {
     }
 
     public static <T> RequestFuture<T> coordinatorNotAvailable() {
-        return failure(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
+        return failure(Errors.COORDINATOR_NOT_AVAILABLE.exception());
     }
 
     public static <T> RequestFuture<T> leaderNotAvailable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java
new file mode 100644
index 0000000..4bdb978
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorLoadInProgressException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * In the context of the group coordinator, the broker returns this error code for any coordinator request if
+ * it is still loading the group metadata (e.g. after a leader change for that group metadata topic partition).
+ *
+ * In the context of the transactional coordinator, this error will be returned if there is a pending transactional
+ * request with the same transactional id, or if the transaction cache is currently being populated from the transaction
+ * log.
+ */
+public class CoordinatorLoadInProgressException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public CoordinatorLoadInProgressException(String message) {
+        super(message);
+    }
+
+    public CoordinatorLoadInProgressException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
new file mode 100644
index 0000000..827ce54
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/CoordinatorNotAvailableException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * In the context of the group coordinator, the broker returns this error code for metadata or offset commit
+ * requests if the group metadata topic has not been created yet.
+ *
+ * In the context of the transactional coordinator, this error will be returned if the underlying transactional log
+ * is under replicated or if an append to the log times out.
+ */
+public class CoordinatorNotAvailableException extends RetriableException {
+    public static final CoordinatorNotAvailableException INSTANCE = new CoordinatorNotAvailableException();
+
+    private static final long serialVersionUID = 1L;
+
+    private CoordinatorNotAvailableException() {
+        super();
+    }
+
+    public CoordinatorNotAvailableException(String message) {
+        super(message);
+    }
+
+    public CoordinatorNotAvailableException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
index 3409b68..03a7719 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupCoordinatorNotAvailableException.java
@@ -19,7 +19,10 @@ package org.apache.kafka.common.errors;
 /**
  * The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has
  * not yet been created.
+ *
+ * @deprecated As of Kafka 0.11, this has been replaced by {@link CoordinatorNotAvailableException}
  */
+@Deprecated
 public class GroupCoordinatorNotAvailableException extends RetriableException {
     public static final GroupCoordinatorNotAvailableException INSTANCE = new GroupCoordinatorNotAvailableException();
 
@@ -41,4 +44,4 @@ public class GroupCoordinatorNotAvailableException extends RetriableException {
         super(cause);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
index f579a37..73daa5f 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
@@ -19,7 +19,10 @@ package org.apache.kafka.common.errors;
 /**
  * The broker returns this error code for any coordinator request if it is still loading the metadata (after a leader change
  * for that offsets topic partition) for this group.
+ *
+ * @deprecated As of Kafka 0.11, this has been replaced by {@link CoordinatorLoadInProgressException}
  */
+@Deprecated
 public class GroupLoadInProgressException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
@@ -40,4 +43,4 @@ public class GroupLoadInProgressException extends RetriableException {
         super(cause);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
new file mode 100644
index 0000000..12d873e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The transaction coordinator returns this error code if the timeout received via the InitPidRequest is larger than
+ * the `max.transaction.timeout.ms` config value.
+ */
+public class InvalidTxnTimeoutException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public InvalidTxnTimeoutException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public InvalidTxnTimeoutException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
new file mode 100644
index 0000000..00ca32c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * In the context of the group coordinator, the broker returns this error code if it receives an offset fetch
+ * or commit request for a group it's not the coordinator of.
+ *
+ * In the context of the transactional coordinator, it returns this error when it receives a transactional
+ * request with a transactionalId the coordinator doesn't own.
+ */
+public class NotCoordinatorException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public NotCoordinatorException(String message) {
+        super(message);
+    }
+
+    public NotCoordinatorException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
index d2ffaea..cee6495 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForGroupException.java
@@ -19,7 +19,10 @@ package org.apache.kafka.common.errors;
 /**
  * The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is
  * not a coordinator for.
+ *
+ * @deprecated As of Kafka 0.11, this has been replaced by {@link NotCoordinatorException}
  */
+@Deprecated
 public class NotCoordinatorForGroupException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
@@ -40,4 +43,4 @@ public class NotCoordinatorForGroupException extends RetriableException {
         super(cause);
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 63bcfec..9e7ce1d 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -35,7 +35,7 @@ public enum ApiKeys {
     CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
     OFFSET_COMMIT(8, "OffsetCommit"),
     OFFSET_FETCH(9, "OffsetFetch"),
-    GROUP_COORDINATOR(10, "GroupCoordinator"),
+    FIND_COORDINATOR(10, "FindCoordinator"),
     JOIN_GROUP(11, "JoinGroup"),
     HEARTBEAT(12, "Heartbeat"),
     LEAVE_GROUP(13, "LeaveGroup"),

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index ccebd93..375cf16 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -23,8 +23,8 @@ import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
-import org.apache.kafka.common.errors.GroupLoadInProgressException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
+import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
@@ -42,10 +42,11 @@ import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
+import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
-import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
+import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -109,12 +110,12 @@ public enum Errors {
             new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
     NETWORK_EXCEPTION(13,
             new NetworkException("The server disconnected before a response was received.")),
-    GROUP_LOAD_IN_PROGRESS(14,
-            new GroupLoadInProgressException("The coordinator is loading and hence can't process requests for this group.")),
-    GROUP_COORDINATOR_NOT_AVAILABLE(15,
-            new GroupCoordinatorNotAvailableException("The group coordinator is not available.")),
-    NOT_COORDINATOR_FOR_GROUP(16,
-            new NotCoordinatorForGroupException("This is not the correct coordinator for this group.")),
+    COORDINATOR_LOAD_IN_PROGRESS(14,
+            new CoordinatorLoadInProgressException("The coordinator is loading and hence can't process requests.")),
+    COORDINATOR_NOT_AVAILABLE(15,
+            new CoordinatorNotAvailableException("The coordinator is not available.")),
+    NOT_COORDINATOR(16,
+            new NotCoordinatorException("This is not the correct coordinator.")),
     INVALID_TOPIC_EXCEPTION(17,
             new InvalidTopicException("The request attempted to perform an operation on an invalid topic.")),
     RECORD_LIST_TOO_LARGE(18,
@@ -182,7 +183,10 @@ public enum Errors {
     INVALID_TXN_STATE(48,
         new InvalidTxnStateException("The producer attempted a transactional operation in an invalid state")),
     INVALID_PID_MAPPING(49,
-        new InvalidPidMappingException("The PID mapping is invalid"));
+        new InvalidPidMappingException("The PID mapping is invalid")),
+    INVALID_TRANSACTION_TIMEOUT(50,
+        new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
+            "(as configured by max.transaction.timeout.ms)."));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 4c58bb8..54f533e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -774,23 +774,43 @@ public class Protocol {
     public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0};
     public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0};
 
-    /* Group coordinator api */
-    public static final Schema GROUP_COORDINATOR_REQUEST_V0 = new Schema(new Field("group_id",
-                                                                                   STRING,
-                                                                                   "The unique group id."));
+    /* Find coordinator api */
+    public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema(
+            new Field("group_id",
+                    STRING,
+                    "The unique group id."));
+
+    public static final Schema FIND_COORDINATOR_REQUEST_V1 = new Schema(
+            new Field("coordinator_key",
+                    STRING,
+                    "Id to use for finding the coordinator (for groups, this is the groupId, " +
+                            "for transactional producers, this is the transactional id)"),
+            new Field("coordinator_type",
+                    INT8,
+                    "The type of coordinator to find (0 = group, 1 = transaction)"));
 
-    public static final Schema GROUP_COORDINATOR_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
-                                                                        new Field("host", STRING, "The hostname of the broker."),
-                                                                        new Field("port", INT32,
-                                                                            "The port on which the broker accepts requests."));
+    public static final Schema FIND_COORDINATOR_BROKER_V0 = new Schema(
+            new Field("node_id", INT32, "The broker id."),
+            new Field("host", STRING, "The hostname of the broker."),
+            new Field("port", INT32,
+                    "The port on which the broker accepts requests."));
 
-    public static final Schema GROUP_COORDINATOR_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
-                                                                          new Field("coordinator",
-                                                                                    GROUP_COORDINATOR_BROKER_V0,
-                                                                                    "Host and port information for the coordinator for a consumer group."));
+    public static final Schema FIND_COORDINATOR_RESPONSE_V0 = new Schema(
+            new Field("error_code", INT16),
+            new Field("coordinator",
+                    FIND_COORDINATOR_BROKER_V0,
+                    "Host and port information for the coordinator for a consumer group."));
+
+    public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema(
+            new Field("error_code", INT16),
+            new Field("error_message", NULLABLE_STRING),
+            new Field("coordinator",
+                    FIND_COORDINATOR_BROKER_V0,
+                    "Host and port information for the coordinator for a consumer group."));
 
-    public static final Schema[] GROUP_COORDINATOR_REQUEST = new Schema[] {GROUP_COORDINATOR_REQUEST_V0};
-    public static final Schema[] GROUP_COORDINATOR_RESPONSE = new Schema[] {GROUP_COORDINATOR_RESPONSE_V0};
+
+    public static final Schema[] FIND_COORDINATOR_REQUEST = new Schema[] {FIND_COORDINATOR_REQUEST_V0, FIND_COORDINATOR_REQUEST_V1};
+    public static final Schema[] FIND_COORDINATOR_RESPONSE = new Schema[] {FIND_COORDINATOR_RESPONSE_V0, FIND_COORDINATOR_RESPONSE_V1};
 
     /* Controlled shutdown api */
     public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id",
@@ -1180,7 +1200,10 @@ public class Protocol {
     public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
             new Field("transactional_id",
                     NULLABLE_STRING,
-                    "The transactional id whose producer id we want to retrieve or generate.")
+                    "The transactional id whose producer id we want to retrieve or generate."),
+            new Field("transaction_timeout_ms",
+                    INT32,
+                    "The time in ms to wait for before aborting idle transactions sent by this producer.")
     );
 
     public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
@@ -1432,7 +1455,7 @@ public class Protocol {
         REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST;
         REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST;
         REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST;
-        REQUESTS[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_REQUEST;
+        REQUESTS[ApiKeys.FIND_COORDINATOR.id] = FIND_COORDINATOR_REQUEST;
         REQUESTS[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_REQUEST;
         REQUESTS[ApiKeys.HEARTBEAT.id] = HEARTBEAT_REQUEST;
         REQUESTS[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_REQUEST;
@@ -1462,7 +1485,7 @@ public class Protocol {
         RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE;
         RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE;
-        RESPONSES[ApiKeys.GROUP_COORDINATOR.id] = GROUP_COORDINATOR_RESPONSE;
+        RESPONSES[ApiKeys.FIND_COORDINATOR.id] = FIND_COORDINATOR_RESPONSE;
         RESPONSES[ApiKeys.JOIN_GROUP.id] = JOIN_GROUP_RESPONSE;
         RESPONSES[ApiKeys.HEARTBEAT.id] = HEARTBEAT_RESPONSE;
         RESPONSES[ApiKeys.LEAVE_GROUP.id] = LEAVE_GROUP_RESPONSE;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index bd4bc49..07bde63 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -123,8 +123,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case OFFSET_FETCH:
                 request = new OffsetFetchRequest(struct, version);
                 break;
-            case GROUP_COORDINATOR:
-                request = new GroupCoordinatorRequest(struct, version);
+            case FIND_COORDINATOR:
+                request = new FindCoordinatorRequest(struct, version);
                 break;
             case JOIN_GROUP:
                 request = new JoinGroupRequest(struct, version);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 433539c..2286783 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -61,8 +61,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new OffsetCommitResponse(struct);
             case OFFSET_FETCH:
                 return new OffsetFetchResponse(struct);
-            case GROUP_COORDINATOR:
-                return new GroupCoordinatorResponse(struct);
+            case FIND_COORDINATOR:
+                return new FindCoordinatorResponse(struct);
             case JOIN_GROUP:
                 return new JoinGroupResponse(struct);
             case HEARTBEAT:

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 5496d66..797ed58 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -51,9 +51,9 @@ public class DescribeGroupsResponse extends AbstractResponse {
     /**
      * Possible per-group error codes:
      *
-     * GROUP_LOAD_IN_PROGRESS (14)
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
+     * COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR (16)
      * AUTHORIZATION_FAILED (29)
      */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
new file mode 100644
index 0000000..46f3426
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorRequest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class FindCoordinatorRequest extends AbstractRequest {
+    private static final String GROUP_ID_KEY_NAME = "group_id";
+    private static final String COORDINATOR_KEY_KEY_NAME = "coordinator_key";
+    private static final String COORDINATOR_TYPE_KEY_NAME = "coordinator_type";
+
+    public static class Builder extends AbstractRequest.Builder<FindCoordinatorRequest> {
+        private final String coordinatorKey;
+        private final CoordinatorType coordinatorType;
+        private final short minVersion;
+
+        public Builder(CoordinatorType coordinatorType, String coordinatorKey) {
+            super(ApiKeys.FIND_COORDINATOR);
+            this.coordinatorType = coordinatorType;
+            this.coordinatorKey = coordinatorKey;
+            this.minVersion = coordinatorType == CoordinatorType.TRANSACTION ? (short) 1 : (short) 0;
+        }
+
+        @Override
+        public FindCoordinatorRequest build(short version) {
+            if (version < minVersion)
+                throw new UnsupportedVersionException("Cannot create a v" + version + " FindCoordinator request " +
+                        "because we require features supported only in " + minVersion + " or later.");
+            return new FindCoordinatorRequest(coordinatorType, coordinatorKey, version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=FindCoordinatorRequest, coordinatorKey=");
+            bld.append(coordinatorKey);
+            bld.append(", coordinatorType=");
+            bld.append(coordinatorType);
+            bld.append(")");
+            return bld.toString();
+        }
+    }
+
+    private final String coordinatorKey;
+    private final CoordinatorType coordinatorType;
+
+    private FindCoordinatorRequest(CoordinatorType coordinatorType, String coordinatorKey, short version) {
+        super(version);
+        this.coordinatorType = coordinatorType;
+        this.coordinatorKey = coordinatorKey;
+    }
+
+    public FindCoordinatorRequest(Struct struct, short version) {
+        super(version);
+
+        if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
+            this.coordinatorType = CoordinatorType.forId(struct.getByte(COORDINATOR_TYPE_KEY_NAME));
+        else
+            this.coordinatorType = CoordinatorType.GROUP;
+        if (struct.hasField(GROUP_ID_KEY_NAME))
+            this.coordinatorKey = struct.getString(GROUP_ID_KEY_NAME);
+        else
+            this.coordinatorKey = struct.getString(COORDINATOR_KEY_KEY_NAME);
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(Throwable e) {
+        short versionId = version();
+        switch (versionId) {
+            case 0:
+            case 1:
+                return new FindCoordinatorResponse(Errors.forException(e), Node.noNode());
+
+            default:
+                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
+                        versionId, this.getClass().getSimpleName(), ApiKeys.FIND_COORDINATOR.latestVersion()));
+        }
+    }
+
+    public String coordinatorKey() {
+        return coordinatorKey;
+    }
+
+    public CoordinatorType coordinatorType() {
+        return coordinatorType;
+    }
+
+    public static FindCoordinatorRequest parse(ByteBuffer buffer, short version) {
+        return new FindCoordinatorRequest(ApiKeys.FIND_COORDINATOR.parseRequest(version, buffer), version);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.requestSchema(version()));
+        if (struct.hasField(GROUP_ID_KEY_NAME))
+            struct.set(GROUP_ID_KEY_NAME, coordinatorKey);
+        else
+            struct.set(COORDINATOR_KEY_KEY_NAME, coordinatorKey);
+        if (struct.hasField(COORDINATOR_TYPE_KEY_NAME))
+            struct.set(COORDINATOR_TYPE_KEY_NAME, coordinatorType.id);
+        return struct;
+    }
+
+    public enum CoordinatorType {
+        GROUP((byte) 0), TRANSACTION((byte) 1);
+
+        final byte id;
+
+        CoordinatorType(byte id) {
+            this.id = id;
+        }
+
+        public static CoordinatorType forId(byte id) {
+            switch (id) {
+                case 0:
+                    return GROUP;
+                case 1:
+                    return TRANSACTION;
+                default:
+                    throw new IllegalArgumentException("Unknown coordinator type received: " + id);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
new file mode 100644
index 0000000..f96f123
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class FindCoordinatorResponse extends AbstractResponse {
+
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
+    private static final String COORDINATOR_KEY_NAME = "coordinator";
+
+    /**
+     * Possible error codes:
+     *
+     * COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR (16)
+     * GROUP_AUTHORIZATION_FAILED (30)
+     */
+
+    // coordinator level field names
+    private static final String NODE_ID_KEY_NAME = "node_id";
+    private static final String HOST_KEY_NAME = "host";
+    private static final String PORT_KEY_NAME = "port";
+
+    private final String errorMessage;
+    private final Errors error;
+    private final Node node;
+
+    public FindCoordinatorResponse(Errors error, Node node) {
+        this.error = error;
+        this.node = node;
+        this.errorMessage = null;
+    }
+
+    public FindCoordinatorResponse(Struct struct) {
+        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
+            errorMessage = struct.getString(ERROR_MESSAGE_KEY_NAME);
+        else
+            errorMessage = null;
+
+        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
+        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
+        String host = broker.getString(HOST_KEY_NAME);
+        int port = broker.getInt(PORT_KEY_NAME);
+        node = new Node(nodeId, host, port);
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.FIND_COORDINATOR.responseSchema(version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        if (struct.hasField(ERROR_MESSAGE_KEY_NAME))
+            struct.set(ERROR_MESSAGE_KEY_NAME, errorMessage);
+
+        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
+        coordinator.set(NODE_ID_KEY_NAME, node.id());
+        coordinator.set(HOST_KEY_NAME, node.host());
+        coordinator.set(PORT_KEY_NAME, node.port());
+        struct.set(COORDINATOR_KEY_NAME, coordinator);
+        return struct;
+    }
+
+    public static FindCoordinatorResponse parse(ByteBuffer buffer, short version) {
+        return new FindCoordinatorResponse(ApiKeys.FIND_COORDINATOR.responseSchema(version).read(buffer));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
deleted file mode 100644
index b45054c..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupCoordinatorRequest extends AbstractRequest {
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-
-    public static class Builder extends AbstractRequest.Builder<GroupCoordinatorRequest> {
-        private final String groupId;
-
-        public Builder(String groupId) {
-            super(ApiKeys.GROUP_COORDINATOR);
-            this.groupId = groupId;
-        }
-
-        @Override
-        public GroupCoordinatorRequest build(short version) {
-            return new GroupCoordinatorRequest(this.groupId, version);
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder bld = new StringBuilder();
-            bld.append("(type=GroupCoordinatorRequest, groupId=");
-            bld.append(groupId).append(")");
-            return bld.toString();
-        }
-    }
-
-    private final String groupId;
-
-    private GroupCoordinatorRequest(String groupId, short version) {
-        super(version);
-        this.groupId = groupId;
-    }
-
-    public GroupCoordinatorRequest(Struct struct, short versionId) {
-        super(versionId);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractResponse getErrorResponse(Throwable e) {
-        short versionId = version();
-        switch (versionId) {
-            case 0:
-                return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, Node.noNode());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ApiKeys.GROUP_COORDINATOR.latestVersion()));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public static GroupCoordinatorRequest parse(ByteBuffer buffer, short version) {
-        return new GroupCoordinatorRequest(ApiKeys.GROUP_COORDINATOR.parseRequest(version, buffer), version);
-    }
-
-    @Override
-    protected Struct toStruct() {
-        Struct struct = new Struct(ApiKeys.GROUP_COORDINATOR.requestSchema(version()));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        return struct;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
deleted file mode 100644
index f8a9f8f..0000000
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.requests;
-
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-public class GroupCoordinatorResponse extends AbstractResponse {
-
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String COORDINATOR_KEY_NAME = "coordinator";
-
-    /**
-     * Possible error codes:
-     *
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
-     * GROUP_AUTHORIZATION_FAILED (30)
-     */
-
-
-    // coordinator level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    private final Errors error;
-    private final Node node;
-
-    public GroupCoordinatorResponse(Errors error, Node node) {
-        this.error = error;
-        this.node = node;
-    }
-
-    public GroupCoordinatorResponse(Struct struct) {
-        error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
-        Struct broker = (Struct) struct.get(COORDINATOR_KEY_NAME);
-        int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-        String host = broker.getString(HOST_KEY_NAME);
-        int port = broker.getInt(PORT_KEY_NAME);
-        node = new Node(nodeId, host, port);
-    }
-
-    public Errors error() {
-        return error;
-    }
-
-    public Node node() {
-        return node;
-    }
-
-    @Override
-    protected Struct toStruct(short version) {
-        Struct struct = new Struct(ApiKeys.GROUP_COORDINATOR.responseSchema(version));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
-        coordinator.set(NODE_ID_KEY_NAME, node.id());
-        coordinator.set(HOST_KEY_NAME, node.host());
-        coordinator.set(PORT_KEY_NAME, node.port());
-        struct.set(COORDINATOR_KEY_NAME, coordinator);
-        return struct;
-    }
-
-    public static GroupCoordinatorResponse parse(ByteBuffer buffer, short version) {
-        return new GroupCoordinatorResponse(ApiKeys.GROUP_COORDINATOR.parseResponse(version, buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 18d63f8..9bc400c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -30,7 +30,7 @@ public class HeartbeatResponse extends AbstractResponse {
      * Possible error codes:
      *
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
      * REBALANCE_IN_PROGRESS (27)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
index 284107f..dedbc0f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -23,27 +23,45 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class InitPidRequest extends AbstractRequest {
+    public static final int NO_TRANSACTION_TIMEOUT_MS = Integer.MAX_VALUE;
+
     private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+    private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
+
 
     private final String transactionalId;
+    private final int transactionTimeoutMs;
 
     public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
         private final String transactionalId;
+        private final int transactionTimeoutMs;
+
         public Builder(String transactionalId) {
+            this(transactionalId, NO_TRANSACTION_TIMEOUT_MS);
+        }
+
+        public Builder(String transactionalId, int transactionTimeoutMs) {
             super(ApiKeys.INIT_PRODUCER_ID);
+
+            if (transactionTimeoutMs <= 0)
+                throw new IllegalArgumentException("transaction timeout value is not positive: " + transactionTimeoutMs);
+
             if (transactionalId != null && transactionalId.isEmpty())
                 throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+
             this.transactionalId = transactionalId;
+            this.transactionTimeoutMs = transactionTimeoutMs;
         }
 
         @Override
         public InitPidRequest build(short version) {
-            return new InitPidRequest(this.transactionalId, version);
+            return new InitPidRequest(version, transactionalId, transactionTimeoutMs);
         }
 
         @Override
         public String toString() {
-            return "(type=InitPidRequest)";
+            return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
+                    transactionTimeoutMs + ")";
         }
 
     }
@@ -51,11 +69,13 @@ public class InitPidRequest extends AbstractRequest {
     public InitPidRequest(Struct struct, short version) {
         super(version);
         this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+        this.transactionTimeoutMs = struct.getInt(TRANSACTION_TIMEOUT_KEY_NAME);
     }
 
-    private InitPidRequest(String transactionalId, short version) {
+    private InitPidRequest(short version, String transactionalId, int transactionTimeoutMs) {
         super(version);
         this.transactionalId = transactionalId;
+        this.transactionTimeoutMs = transactionTimeoutMs;
     }
 
     @Override
@@ -71,10 +91,15 @@ public class InitPidRequest extends AbstractRequest {
         return transactionalId;
     }
 
+    public int transactionTimeoutMs() {
+        return transactionTimeoutMs;
+    }
+
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
         struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        struct.set(TRANSACTION_TIMEOUT_KEY_NAME, transactionTimeoutMs);
         return struct;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index bd55e9b..1f702c7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -33,9 +33,9 @@ public class JoinGroupResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
-     * GROUP_LOAD_IN_PROGRESS (14)
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * INCONSISTENT_GROUP_PROTOCOL (23)
      * UNKNOWN_MEMBER_ID (25)
      * INVALID_SESSION_TIMEOUT (26)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index f2ac33b..d0409ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -35,7 +35,7 @@ public class ListGroupsResponse extends AbstractResponse {
     /**
      * Possible error codes:
      *
-     * GROUP_COORDINATOR_NOT_AVAILABLE (15)
+     * COORDINATOR_NOT_AVAILABLE (15)
      * AUTHORIZATION_FAILED (29)
      */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index b30505b..b1dae37 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -45,9 +45,9 @@ public class OffsetCommitResponse extends AbstractResponse {
      *
      * UNKNOWN_TOPIC_OR_PARTITION (3)
      * OFFSET_METADATA_TOO_LARGE (12)
-     * GROUP_LOAD_IN_PROGRESS (14)
+     * COORDINATOR_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
      * REBALANCE_IN_PROGRESS (27)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index b16b233..69507f0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -56,9 +56,9 @@ public class OffsetFetchResponse extends AbstractResponse {
      *   - UNKNOWN_TOPIC_OR_PARTITION (3)
      *
      * - Group or coordinator errors:
-     *   - GROUP_LOAD_IN_PROGRESS (14)
-     *   - GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     *   - NOT_COORDINATOR_FOR_GROUP (16)
+     *   - COORDINATOR_LOAD_IN_PROGRESS (14)
+     *   - COORDINATOR_NOT_AVAILABLE (15)
+     *   - NOT_COORDINATOR (16)
      *   - GROUP_AUTHORIZATION_FAILED (30)
      */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index 5d50c5c..4a06491 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -31,7 +31,7 @@ public class SyncGroupResponse extends AbstractResponse {
      * Possible error codes:
      *
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_GROUP (16)
+     * NOT_COORDINATOR (16)
      * ILLEGAL_GENERATION (22)
      * UNKNOWN_MEMBER_ID (25)
      * REBALANCE_IN_PROGRESS (27)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 3dd3983..6a9f3eb 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -47,7 +47,7 @@ import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.FetchResponse.PartitionData;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.JoinGroupResponse;
@@ -494,7 +494,7 @@ public class KafkaConsumerTest {
         consumer.assign(singletonList(tp0));
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // fetch offset for one topic
@@ -1003,7 +1003,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1068,7 +1068,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1129,7 +1129,7 @@ public class KafkaConsumerTest {
                 rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, true, autoCommitIntervalMs);
 
         // lookup coordinator
-        client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+        client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
         Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
 
         // manual assignment
@@ -1344,7 +1344,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, final Set<String> subscribedTopics, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+            client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 
@@ -1367,7 +1367,7 @@ public class KafkaConsumerTest {
     private Node prepareRebalance(MockClient client, Node node, PartitionAssignor assignor, List<TopicPartition> partitions, Node coordinator) {
         if (coordinator == null) {
             // lookup coordinator
-            client.prepareResponseFrom(new GroupCoordinatorResponse(Errors.NONE, node), node);
+            client.prepareResponseFrom(new FindCoordinatorResponse(Errors.NONE, node), node);
             coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0e7c6b9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 62801d0..4779f43 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
-import org.apache.kafka.common.requests.GroupCoordinatorResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatRequest;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
@@ -466,8 +466,8 @@ public class AbstractCoordinatorTest {
         }, 3000, "Should have received a heartbeat request after joining the group");
     }
 
-    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
-        return new GroupCoordinatorResponse(error, node);
+    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
+        return new FindCoordinatorResponse(error, node);
     }
 
     private HeartbeatResponse heartbeatResponse(Errors error) {