You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/21 21:08:49 UTC
[5/8] kafka git commit: KAFKA-2464: client-side assignment for new
consumer
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
new file mode 100644
index 0000000..93994d7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -0,0 +1,749 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.GroupMetadataResponse;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class ConsumerCoordinatorTest {
+
+ private String topicName = "test";
+ private String groupId = "test-group";
+ private TopicPartition tp = new TopicPartition(topicName, 0);
+ private int sessionTimeoutMs = 10;
+ private int heartbeatIntervalMs = 2;
+ private long retryBackoffMs = 100;
+ private long requestTimeoutMs = 5000;
+ private boolean autoCommitEnabled = false;
+ private long autoCommitIntervalMs = 5000;
+ private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
+ private List<PartitionAssignor> assignors = Arrays.<PartitionAssignor>asList(partitionAssignor);
+ private MockTime time;
+ private MockClient client;
+ private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
+ private Node node = cluster.nodes().get(0);
+ private SubscriptionState subscriptions;
+ private Metadata metadata;
+ private Metrics metrics;
+ private Map<String, String> metricTags = new LinkedHashMap<>();
+ private ConsumerNetworkClient consumerClient;
+ private MockRebalanceListener rebalanceListener;
+ private MockCommitCallback defaultOffsetCommitCallback;
+ private ConsumerCoordinator coordinator;
+
+ @Before
+ public void setup() {
+ this.time = new MockTime();
+ this.client = new MockClient(time);
+ this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
+ this.metadata = new Metadata(0, Long.MAX_VALUE);
+ this.metadata.update(cluster, time.milliseconds());
+ this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
+ this.metrics = new Metrics(time);
+ this.rebalanceListener = new MockRebalanceListener();
+ this.defaultOffsetCommitCallback = new MockCommitCallback();
+ this.partitionAssignor.clear();
+
+ client.setNode(node);
+
+ this.coordinator = new ConsumerCoordinator(
+ consumerClient,
+ groupId,
+ sessionTimeoutMs,
+ heartbeatIntervalMs,
+ assignors,
+ metadata,
+ subscriptions,
+ metrics,
+ "consumer" + groupId,
+ metricTags,
+ time,
+ requestTimeoutMs,
+ retryBackoffMs,
+ defaultOffsetCommitCallback,
+ autoCommitEnabled,
+ autoCommitIntervalMs);
+ }
+
+ @After
+ public void teardown() {
+ this.metrics.close();
+ }
+
+ @Test
+ public void testNormalHeartbeat() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // normal heartbeat
+ time.sleep(sessionTimeoutMs);
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.succeeded());
+ }
+
+ @Test
+ public void testCoordinatorNotAvailable() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // GROUP_COORDINATOR_NOT_AVAILABLE will mark coordinator as unknown
+ time.sleep(sessionTimeoutMs);
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()));
+ time.sleep(sessionTimeoutMs);
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
+ assertTrue(coordinator.coordinatorUnknown());
+ }
+
+ @Test
+ public void testNotCoordinator() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // not_coordinator will mark coordinator as unknown
+ time.sleep(sessionTimeoutMs);
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP.code()));
+ time.sleep(sessionTimeoutMs);
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), future.exception());
+ assertTrue(coordinator.coordinatorUnknown());
+ }
+
+ @Test
+ public void testIllegalGeneration() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // illegal_generation will cause re-partition
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+
+ time.sleep(sessionTimeoutMs);
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
+ time.sleep(sessionTimeoutMs);
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
+ assertTrue(coordinator.needRejoin());
+ }
+
+ @Test
+ public void testUnknownConsumerId() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // illegal_generation will cause re-partition
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.changePartitionAssignment(Collections.singletonList(tp));
+
+ time.sleep(sessionTimeoutMs);
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID.code()));
+ time.sleep(sessionTimeoutMs);
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), future.exception());
+ assertTrue(coordinator.needRejoin());
+ }
+
+ @Test
+ public void testCoordinatorDisconnect() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // coordinator disconnect will mark coordinator as unknown
+ time.sleep(sessionTimeoutMs);
+ RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
+ assertEquals(1, consumerClient.pendingRequestCount());
+ assertFalse(future.isDone());
+
+ client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
+ time.sleep(sessionTimeoutMs);
+ consumerClient.poll(0);
+
+ assertTrue(future.isDone());
+ assertTrue(future.failed());
+ assertTrue(future.exception() instanceof DisconnectException);
+ assertTrue(coordinator.coordinatorUnknown());
+ }
+
+ @Test
+ public void testNormalJoinGroupLeader() {
+ final String consumerId = "leader";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ // ensure metadata is up-to-date for leader
+ metadata.setTopics(Arrays.asList(topicName));
+ metadata.update(cluster, time.milliseconds());
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // normal join group
+ Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName));
+ partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp)));
+
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code()));
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ return sync.memberId().equals(consumerId) &&
+ sync.generationId() == 1 &&
+ sync.groupAssignment().containsKey(consumerId);
+ }
+ }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(Collections.emptySet(), rebalanceListener.revoked);
+ assertEquals(1, rebalanceListener.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ }
+
+ @Test
+ public void testNormalJoinGroupFollower() {
+ final String consumerId = "consumer";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // normal join group
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ SyncGroupRequest sync = new SyncGroupRequest(request.request().body());
+ return sync.memberId().equals(consumerId) &&
+ sync.generationId() == 1 &&
+ sync.groupAssignment().isEmpty();
+ }
+ }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+
+ coordinator.ensurePartitionAssignment();
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(1, rebalanceListener.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ }
+
+
+ @Test
+ public void testMetadataChangeTriggersRebalance() {
+ final String consumerId = "consumer";
+
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+
+ coordinator.ensurePartitionAssignment();
+
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+
+ // a new partition is added to the topic
+ metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds());
+
+ // we should detect the change and ask for reassignment
+ assertTrue(subscriptions.partitionAssignmentNeeded());
+ }
+
+ @Test
+ public void testRejoinGroup() {
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // join the group once
+ client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(1, rebalanceListener.assignedCount);
+
+ // and join the group again
+ subscriptions.needReassignment();
+ client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ assertEquals(2, rebalanceListener.revokedCount);
+ assertEquals(Collections.singleton(tp), rebalanceListener.revoked);
+ assertEquals(2, rebalanceListener.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ }
+
+ @Test
+ public void testDisconnectInJoin() {
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // disconnected from original coordinator will cause re-discover and join again
+ client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+ assertFalse(subscriptions.partitionAssignmentNeeded());
+ assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
+ assertEquals(1, rebalanceListener.revokedCount);
+ assertEquals(1, rebalanceListener.assignedCount);
+ assertEquals(Collections.singleton(tp), rebalanceListener.assigned);
+ }
+
+ @Test(expected = ApiException.class)
+ public void testInvalidSessionTimeout() {
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+ subscriptions.needReassignment();
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // coordinator doesn't like the session timeout
+ client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
+ coordinator.ensurePartitionAssignment();
+ }
+
+ @Test
+ public void testCommitOffsetOnly() {
+ subscriptions.assign(Arrays.asList(tp));
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
+ consumerClient.poll(0);
+ assertTrue(success.get());
+
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
+ public void testCommitOffsetMetadata() {
+ subscriptions.assign(Arrays.asList(tp));
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
+ consumerClient.poll(0);
+ assertTrue(success.get());
+
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ assertEquals("hello", subscriptions.committed(tp).metadata());
+ }
+
+ @Test
+ public void testCommitOffsetAsyncWithDefaultCallback() {
+ int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
+ consumerClient.poll(0);
+ assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+ assertNull(defaultOffsetCommitCallback.exception);
+ }
+
+ @Test
+ public void testResetGeneration() {
+ // enable auto-assignment
+ subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
+ client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+ coordinator.ensurePartitionAssignment();
+
+ // now switch to manual assignment
+ subscriptions.unsubscribe();
+ coordinator.resetGeneration();
+ subscriptions.assign(Arrays.asList(tp));
+
+ // the client should not reuse generation/memberId from auto-subscribed generation
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(ClientRequest request) {
+ OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
+ return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
+ commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
+ }
+ }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+
+ AtomicBoolean success = new AtomicBoolean(false);
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
+ consumerClient.poll(0);
+ assertTrue(success.get());
+ }
+
+ @Test
+ public void testCommitOffsetAsyncFailedWithDefaultCallback() {
+ int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
+ consumerClient.poll(0);
+ assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
+ assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
+ }
+
+ @Test
+ public void testCommitOffsetAsyncCoordinatorNotAvailable() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // async commit with coordinator not available
+ MockCommitCallback cb = new MockCommitCallback();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+ consumerClient.poll(0);
+
+ assertTrue(coordinator.coordinatorUnknown());
+ assertEquals(1, cb.invoked);
+ assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
+ }
+
+ @Test
+ public void testCommitOffsetAsyncNotCoordinator() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // async commit with not coordinator
+ MockCommitCallback cb = new MockCommitCallback();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+ consumerClient.poll(0);
+
+ assertTrue(coordinator.coordinatorUnknown());
+ assertEquals(1, cb.invoked);
+ assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), cb.exception);
+ }
+
+ @Test
+ public void testCommitOffsetAsyncDisconnected() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // async commit with coordinator disconnected
+ MockCommitCallback cb = new MockCommitCallback();
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+ coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
+ consumerClient.poll(0);
+
+ assertTrue(coordinator.coordinatorUnknown());
+ assertEquals(1, cb.invoked);
+ assertTrue(cb.exception instanceof DisconnectException);
+ }
+
+ @Test
+ public void testCommitOffsetSyncNotCoordinator() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code())));
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+ }
+
+ @Test
+ public void testCommitOffsetSyncCoordinatorNotAvailable() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+ }
+
+ @Test
+ public void testCommitOffsetSyncCoordinatorDisconnected() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+ }
+
+ @Test(expected = OffsetMetadataTooLarge.class)
+ public void testCommitOffsetMetadataTooLarge() {
+ // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
+ }
+
+ @Test(expected = ApiException.class)
+ public void testCommitOffsetSyncCallbackWithNonRetriableException() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ // sync commit with invalid partitions should throw if we have no callback
+ client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.UNKNOWN.code())), false);
+ coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
+ }
+
+ @Test
+ public void testRefreshOffset() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ subscriptions.assign(Arrays.asList(tp));
+ subscriptions.needRefreshCommits();
+ client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
+ public void testRefreshOffsetLoadInProgress() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ subscriptions.assign(Arrays.asList(tp));
+ subscriptions.needRefreshCommits();
+ client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
+ client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
+ public void testRefreshOffsetNotCoordinatorForConsumer() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ subscriptions.assign(Arrays.asList(tp));
+ subscriptions.needRefreshCommits();
+ client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L));
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(100L, subscriptions.committed(tp).offset());
+ }
+
+ @Test
+ public void testRefreshOffsetWithNoFetchableOffsets() {
+ client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+ coordinator.ensureCoordinatorKnown();
+
+ subscriptions.assign(Arrays.asList(tp));
+ subscriptions.needRefreshCommits();
+ client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
+ coordinator.refreshCommittedOffsetsIfNeeded();
+ assertFalse(subscriptions.refreshCommitsNeeded());
+ assertEquals(null, subscriptions.committed(tp));
+ }
+
+ private Struct consumerMetadataResponse(Node node, short error) {
+ GroupMetadataResponse response = new GroupMetadataResponse(error, node);
+ return response.toStruct();
+ }
+
+ private Struct heartbeatResponse(short error) {
+ HeartbeatResponse response = new HeartbeatResponse(error);
+ return response.toStruct();
+ }
+
+ private Struct joinGroupLeaderResponse(int generationId, String memberId,
+ Map<String, List<String>> subscriptions,
+ short error) {
+ Map<String, ByteBuffer> metadata = new HashMap<>();
+ for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
+ PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
+ ByteBuffer buf = ConsumerProtocol.serializeSubscription(subscription);
+ metadata.put(subscriptionEntry.getKey(), buf);
+ }
+ return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, memberId, metadata).toStruct();
+ }
+
+ private Struct joinGroupFollowerResponse(int generationId, String memberId, String leaderId, short error) {
+ return new JoinGroupResponse(error, generationId, partitionAssignor.name(), memberId, leaderId,
+ Collections.<String, ByteBuffer>emptyMap()).toStruct();
+ }
+
+ private Struct syncGroupResponse(List<TopicPartition> partitions, short error) {
+ ByteBuffer buf = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
+ return new SyncGroupResponse(error, buf).toStruct();
+ }
+
+ private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
+ OffsetCommitResponse response = new OffsetCommitResponse(responseData);
+ return response.toStruct();
+ }
+
+ private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
+ OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
+ OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
+ return response.toStruct();
+ }
+
+ private OffsetCommitCallback callback(final AtomicBoolean success) {
+ return new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ if (exception == null)
+ success.set(true);
+ }
+ };
+ }
+
+ private static class MockCommitCallback implements OffsetCommitCallback {
+ public int invoked = 0;
+ public Exception exception = null;
+
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ invoked++;
+ this.exception = exception;
+ }
+ }
+
+ private static class MockRebalanceListener implements ConsumerRebalanceListener {
+ public Collection<TopicPartition> revoked;
+ public Collection<TopicPartition> assigned;
+ public int revokedCount = 0;
+ public int assignedCount = 0;
+
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+ this.assigned = partitions;
+ assignedCount++;
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+ this.revoked = partitions;
+ revokedCount++;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
index 9de1cee..6a42058 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
@@ -106,7 +106,7 @@ public class ConsumerNetworkClientTest {
private HeartbeatRequest heartbeatRequest() {
- return new HeartbeatRequest("group", 1, "consumerId");
+ return new HeartbeatRequest("group", 1, "memberId");
}
private Struct heartbeatResponse(short error) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
new file mode 100644
index 0000000..8113770
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConsumerProtocolTest {
+
+ @Test
+ public void serializeDeserializeMetadata() {
+ Subscription subscription = new Subscription(Arrays.asList("foo", "bar"));
+
+ ByteBuffer buffer = ConsumerProtocol.serializeSubscription(subscription);
+ Subscription parsedSubscription = ConsumerProtocol.deserializeSubscription(buffer);
+ assertEquals(subscription.topics(), parsedSubscription.topics());
+ }
+
+ @Test
+ public void deserializeNewSubscriptionVersion() {
+ // verify that a new version which adds a field is still parseable
+ short version = 100;
+
+ Schema subscriptionSchemaV100 = new Schema(
+ new Field(ConsumerProtocol.TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
+ new Field(ConsumerProtocol.USER_DATA_KEY_NAME, Type.BYTES),
+ new Field("foo", Type.STRING));
+
+ Struct subscriptionV100 = new Struct(subscriptionSchemaV100);
+ subscriptionV100.set(ConsumerProtocol.TOPICS_KEY_NAME, new Object[]{"topic"});
+ subscriptionV100.set(ConsumerProtocol.USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
+ subscriptionV100.set("foo", "bar");
+
+ Struct headerV100 = new Struct(ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA);
+ headerV100.set(ConsumerProtocol.VERSION_KEY_NAME, version);
+
+ ByteBuffer buffer = ByteBuffer.allocate(subscriptionV100.sizeOf() + headerV100.sizeOf());
+ headerV100.writeTo(buffer);
+ subscriptionV100.writeTo(buffer);
+
+ buffer.flip();
+
+ Subscription subscription = ConsumerProtocol.deserializeSubscription(buffer);
+ assertEquals(Arrays.asList("topic"), subscription.topics());
+ }
+
+ @Test
+ public void serializeDeserializeAssignment() {
+ List<TopicPartition> partitions = Arrays.asList(new TopicPartition("foo", 0), new TopicPartition("bar", 2));
+ ByteBuffer buffer = ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(partitions));
+ PartitionAssignor.Assignment parsedAssignment = ConsumerProtocol.deserializeAssignment(buffer);
+ assertEquals(toSet(partitions), toSet(parsedAssignment.partitions()));
+ }
+
+ @Test
+ public void deserializeNewAssignmentVersion() {
+ // verify that a new version which adds a field is still parseable
+ short version = 100;
+
+ Schema assignmentSchemaV100 = new Schema(
+ new Field(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(ConsumerProtocol.TOPIC_ASSIGNMENT_V0)),
+ new Field(ConsumerProtocol.USER_DATA_KEY_NAME, Type.BYTES),
+ new Field("foo", Type.STRING));
+
+ Struct assignmentV100 = new Struct(assignmentSchemaV100);
+ assignmentV100.set(ConsumerProtocol.TOPIC_PARTITIONS_KEY_NAME,
+ new Object[]{new Struct(ConsumerProtocol.TOPIC_ASSIGNMENT_V0)
+ .set(ConsumerProtocol.TOPIC_KEY_NAME, "foo")
+ .set(ConsumerProtocol.PARTITIONS_KEY_NAME, new Object[]{1})});
+ assignmentV100.set(ConsumerProtocol.USER_DATA_KEY_NAME, ByteBuffer.wrap(new byte[0]));
+ assignmentV100.set("foo", "bar");
+
+ Struct headerV100 = new Struct(ConsumerProtocol.CONSUMER_PROTOCOL_HEADER_SCHEMA);
+ headerV100.set(ConsumerProtocol.VERSION_KEY_NAME, version);
+
+ ByteBuffer buffer = ByteBuffer.allocate(assignmentV100.sizeOf() + headerV100.sizeOf());
+ headerV100.writeTo(buffer);
+ assignmentV100.writeTo(buffer);
+
+ buffer.flip();
+
+ PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(buffer);
+ assertEquals(toSet(Arrays.asList(new TopicPartition("foo", 1))), toSet(assignment.partitions()));
+ }
+
+ private static <T> Set<T> toSet(Collection<T> collection) {
+ return new HashSet<>(collection);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
deleted file mode 100644
index 66b2e32..0000000
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ /dev/null
@@ -1,635 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.kafka.clients.ClientRequest;
-import org.apache.kafka.clients.Metadata;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ApiException;
-import org.apache.kafka.common.errors.DisconnectException;
-import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.ConsumerMetadataResponse;
-import org.apache.kafka.common.requests.HeartbeatResponse;
-import org.apache.kafka.common.requests.JoinGroupResponse;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
-import org.apache.kafka.common.requests.OffsetCommitResponse;
-import org.apache.kafka.common.requests.OffsetFetchResponse;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.test.TestUtils;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-
-public class CoordinatorTest {
-
- private String topicName = "test";
- private String groupId = "test-group";
- private TopicPartition tp = new TopicPartition(topicName, 0);
- private int sessionTimeoutMs = 10;
- private int heartbeatIntervalMs = 2;
- private long retryBackoffMs = 100;
- private long requestTimeoutMs = 5000;
- private boolean autoCommitEnabled = false;
- private long autoCommitIntervalMs = 5000;
- private String rebalanceStrategy = "not-matter";
- private MockTime time;
- private MockClient client;
- private Cluster cluster = TestUtils.singletonCluster(topicName, 1);
- private Node node = cluster.nodes().get(0);
- private SubscriptionState subscriptions;
- private Metadata metadata;
- private Metrics metrics;
- private Map<String, String> metricTags = new LinkedHashMap<String, String>();
- private ConsumerNetworkClient consumerClient;
- private MockRebalanceListener subscriptionListener;
- private MockCommitCallback defaultOffsetCommitCallback;
- private Coordinator coordinator;
-
- @Before
- public void setup() {
- this.time = new MockTime();
- this.client = new MockClient(time);
- this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
- this.metadata = new Metadata(0, Long.MAX_VALUE);
- this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100);
- this.metrics = new Metrics(time);
- this.subscriptionListener = new MockRebalanceListener();
- this.defaultOffsetCommitCallback = new MockCommitCallback();
-
- client.setNode(node);
-
- this.coordinator = new Coordinator(consumerClient,
- groupId,
- sessionTimeoutMs,
- heartbeatIntervalMs,
- rebalanceStrategy,
- subscriptions,
- metrics,
- "consumer" + groupId,
- metricTags,
- time,
- requestTimeoutMs,
- retryBackoffMs,
- defaultOffsetCommitCallback,
- autoCommitEnabled,
- autoCommitIntervalMs);
- }
-
- @After
- public void teardown() {
- this.metrics.close();
- }
-
- @Test
- public void testNormalHeartbeat() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // normal heartbeat
- time.sleep(sessionTimeoutMs);
- RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
- assertEquals(1, consumerClient.pendingRequestCount());
- assertFalse(future.isDone());
-
- client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
- consumerClient.poll(0);
-
- assertTrue(future.isDone());
- assertTrue(future.succeeded());
- }
-
- @Test
- public void testCoordinatorNotAvailable() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // consumer_coordinator_not_available will mark coordinator as unknown
- time.sleep(sessionTimeoutMs);
- RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
- assertEquals(1, consumerClient.pendingRequestCount());
- assertFalse(future.isDone());
-
- client.prepareResponse(heartbeatResponse(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code()));
- time.sleep(sessionTimeoutMs);
- consumerClient.poll(0);
-
- assertTrue(future.isDone());
- assertTrue(future.failed());
- assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), future.exception());
- assertTrue(coordinator.coordinatorUnknown());
- }
-
- @Test
- public void testNotCoordinator() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // not_coordinator will mark coordinator as unknown
- time.sleep(sessionTimeoutMs);
- RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
- assertEquals(1, consumerClient.pendingRequestCount());
- assertFalse(future.isDone());
-
- client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_CONSUMER.code()));
- time.sleep(sessionTimeoutMs);
- consumerClient.poll(0);
-
- assertTrue(future.isDone());
- assertTrue(future.failed());
- assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), future.exception());
- assertTrue(coordinator.coordinatorUnknown());
- }
-
- @Test
- public void testIllegalGeneration() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // illegal_generation will cause re-partition
- subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
- subscriptions.changePartitionAssignment(Collections.singletonList(tp));
-
- time.sleep(sessionTimeoutMs);
- RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
- assertEquals(1, consumerClient.pendingRequestCount());
- assertFalse(future.isDone());
-
- client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
- time.sleep(sessionTimeoutMs);
- consumerClient.poll(0);
-
- assertTrue(future.isDone());
- assertTrue(future.failed());
- assertEquals(Errors.ILLEGAL_GENERATION.exception(), future.exception());
- assertTrue(subscriptions.partitionAssignmentNeeded());
- }
-
- @Test
- public void testUnknownConsumerId() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // illegal_generation will cause re-partition
- subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
- subscriptions.changePartitionAssignment(Collections.singletonList(tp));
-
- time.sleep(sessionTimeoutMs);
- RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
- assertEquals(1, consumerClient.pendingRequestCount());
- assertFalse(future.isDone());
-
- client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_CONSUMER_ID.code()));
- time.sleep(sessionTimeoutMs);
- consumerClient.poll(0);
-
- assertTrue(future.isDone());
- assertTrue(future.failed());
- assertEquals(Errors.UNKNOWN_CONSUMER_ID.exception(), future.exception());
- assertTrue(subscriptions.partitionAssignmentNeeded());
- }
-
- @Test
- public void testCoordinatorDisconnect() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // coordinator disconnect will mark coordinator as unknown
- time.sleep(sessionTimeoutMs);
- RequestFuture<Void> future = coordinator.sendHeartbeatRequest(); // should send out the heartbeat
- assertEquals(1, consumerClient.pendingRequestCount());
- assertFalse(future.isDone());
-
- client.prepareResponse(heartbeatResponse(Errors.NONE.code()), true); // return disconnected
- time.sleep(sessionTimeoutMs);
- consumerClient.poll(0);
-
- assertTrue(future.isDone());
- assertTrue(future.failed());
- assertTrue(future.exception() instanceof DisconnectException);
- assertTrue(coordinator.coordinatorUnknown());
- }
-
- @Test
- public void testNormalJoinGroup() {
- subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
- subscriptions.needReassignment();
-
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // normal join group
- client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
- coordinator.ensurePartitionAssignment();
-
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
- assertEquals(1, subscriptionListener.revokedCount);
- assertEquals(Collections.emptySet(), subscriptionListener.revoked);
- assertEquals(1, subscriptionListener.assignedCount);
- assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
- }
-
- @Test
- public void testReJoinGroup() {
- subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
- subscriptions.needReassignment();
-
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // disconnected from original coordinator will cause re-discover and join again
- client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()), true);
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
- coordinator.ensurePartitionAssignment();
- assertFalse(subscriptions.partitionAssignmentNeeded());
- assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions());
- assertEquals(1, subscriptionListener.revokedCount);
- assertEquals(Collections.emptySet(), subscriptionListener.revoked);
- assertEquals(1, subscriptionListener.assignedCount);
- assertEquals(Collections.singleton(tp), subscriptionListener.assigned);
- }
-
- @Test(expected = ApiException.class)
- public void testUnknownPartitionAssignmentStrategy() {
- subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
- subscriptions.needReassignment();
-
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // coordinator doesn't like our assignment strategy
- client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code()));
- coordinator.ensurePartitionAssignment();
- }
-
- @Test(expected = ApiException.class)
- public void testInvalidSessionTimeout() {
- subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
- subscriptions.needReassignment();
-
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // coordinator doesn't like our assignment strategy
- client.prepareResponse(joinGroupResponse(0, "consumer", Collections.<TopicPartition>emptyList(), Errors.INVALID_SESSION_TIMEOUT.code()));
- coordinator.ensurePartitionAssignment();
- }
-
- @Test
- public void testCommitOffsetOnly() {
- subscriptions.assign(Arrays.asList(tp));
-
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-
- AtomicBoolean success = new AtomicBoolean(false);
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
- consumerClient.poll(0);
- assertTrue(success.get());
-
- assertEquals(100L, subscriptions.committed(tp).offset());
- }
-
- @Test
- public void testCommitOffsetMetadata() {
- subscriptions.assign(Arrays.asList(tp));
-
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-
- AtomicBoolean success = new AtomicBoolean(false);
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "hello")), callback(success));
- consumerClient.poll(0);
- assertTrue(success.get());
-
- assertEquals(100L, subscriptions.committed(tp).offset());
- assertEquals("hello", subscriptions.committed(tp).metadata());
- }
-
- @Test
- public void testCommitOffsetAsyncWithDefaultCallback() {
- int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
- consumerClient.poll(0);
- assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
- assertNull(defaultOffsetCommitCallback.exception);
- }
-
- @Test
- public void testResetGeneration() {
- // enable auto-assignment
- subscriptions.subscribe(Arrays.asList(topicName), subscriptionListener);
-
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- client.prepareResponse(joinGroupResponse(1, "consumer", Collections.singletonList(tp), Errors.NONE.code()));
- coordinator.ensurePartitionAssignment();
-
- // now switch to manual assignment
- subscriptions.unsubscribe();
- coordinator.resetGeneration();
- subscriptions.assign(Arrays.asList(tp));
-
- // the client should not reuse generation/consumerId from auto-subscribed generation
- client.prepareResponse(new MockClient.RequestMatcher() {
- @Override
- public boolean matches(ClientRequest request) {
- OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
- return commitRequest.consumerId().equals(OffsetCommitRequest.DEFAULT_CONSUMER_ID) &&
- commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
- }
- }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
-
- AtomicBoolean success = new AtomicBoolean(false);
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), callback(success));
- consumerClient.poll(0);
- assertTrue(success.get());
- }
-
- @Test
- public void testCommitOffsetAsyncFailedWithDefaultCallback() {
- int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), null);
- consumerClient.poll(0);
- assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked);
- assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), defaultOffsetCommitCallback.exception);
- }
-
- @Test
- public void testCommitOffsetAsyncCoordinatorNotAvailable() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // async commit with coordinator not available
- MockCommitCallback cb = new MockCommitCallback();
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
- consumerClient.poll(0);
-
- assertTrue(coordinator.coordinatorUnknown());
- assertEquals(1, cb.invoked);
- assertEquals(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception(), cb.exception);
- }
-
- @Test
- public void testCommitOffsetAsyncNotCoordinator() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // async commit with not coordinator
- MockCommitCallback cb = new MockCommitCallback();
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
- consumerClient.poll(0);
-
- assertTrue(coordinator.coordinatorUnknown());
- assertEquals(1, cb.invoked);
- assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.exception(), cb.exception);
- }
-
- @Test
- public void testCommitOffsetAsyncDisconnected() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // async commit with coordinator disconnected
- MockCommitCallback cb = new MockCommitCallback();
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
- coordinator.commitOffsetsAsync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)), cb);
- consumerClient.poll(0);
-
- assertTrue(coordinator.coordinatorUnknown());
- assertEquals(1, cb.invoked);
- assertTrue(cb.exception instanceof DisconnectException);
- }
-
- @Test
- public void testCommitOffsetSyncNotCoordinator() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code())));
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
- }
-
- @Test
- public void testCommitOffsetSyncCoordinatorNotAvailable() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code())));
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
- }
-
- @Test
- public void testCommitOffsetSyncCoordinatorDisconnected() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())), true);
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
- coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
- }
-
- @Test(expected = OffsetMetadataTooLarge.class)
- public void testCommitOffsetMetadataTooLarge() {
- // since offset metadata is provided by the user, we have to propagate the exception so they can handle it
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.OFFSET_METADATA_TOO_LARGE.code())));
- coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L, "metadata")));
- }
-
- @Test(expected = ApiException.class)
- public void testCommitOffsetSyncCallbackWithNonRetriableException() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- // sync commit with invalid partitions should throw if we have no callback
- client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code())), false);
- coordinator.commitOffsetsSync(Collections.singletonMap(tp, new OffsetAndMetadata(100L)));
- }
-
- @Test
- public void testRefreshOffset() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- subscriptions.assign(Arrays.asList(tp));
- subscriptions.needRefreshCommits();
- client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
- coordinator.refreshCommittedOffsetsIfNeeded();
- assertFalse(subscriptions.refreshCommitsNeeded());
- assertEquals(100L, subscriptions.committed(tp).offset());
- }
-
- @Test
- public void testRefreshOffsetLoadInProgress() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- subscriptions.assign(Arrays.asList(tp));
- subscriptions.needRefreshCommits();
- client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
- client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
- coordinator.refreshCommittedOffsetsIfNeeded();
- assertFalse(subscriptions.refreshCommitsNeeded());
- assertEquals(100L, subscriptions.committed(tp).offset());
- }
-
- @Test
- public void testRefreshOffsetNotCoordinatorForConsumer() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- subscriptions.assign(Arrays.asList(tp));
- subscriptions.needRefreshCommits();
- client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_CONSUMER.code(), "", 100L));
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
- coordinator.refreshCommittedOffsetsIfNeeded();
- assertFalse(subscriptions.refreshCommitsNeeded());
- assertEquals(100L, subscriptions.committed(tp).offset());
- }
-
- @Test
- public void testRefreshOffsetWithNoFetchableOffsets() {
- client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
- coordinator.ensureCoordinatorKnown();
-
- subscriptions.assign(Arrays.asList(tp));
- subscriptions.needRefreshCommits();
- client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
- coordinator.refreshCommittedOffsetsIfNeeded();
- assertFalse(subscriptions.refreshCommitsNeeded());
- assertEquals(null, subscriptions.committed(tp));
- }
-
- private Struct consumerMetadataResponse(Node node, short error) {
- ConsumerMetadataResponse response = new ConsumerMetadataResponse(error, node);
- return response.toStruct();
- }
-
- private Struct heartbeatResponse(short error) {
- HeartbeatResponse response = new HeartbeatResponse(error);
- return response.toStruct();
- }
-
- private Struct joinGroupResponse(int generationId, String consumerId, List<TopicPartition> assignedPartitions, short error) {
- JoinGroupResponse response = new JoinGroupResponse(error, generationId, consumerId, assignedPartitions);
- return response.toStruct();
- }
-
- private Struct offsetCommitResponse(Map<TopicPartition, Short> responseData) {
- OffsetCommitResponse response = new OffsetCommitResponse(responseData);
- return response.toStruct();
- }
-
- private Struct offsetFetchResponse(TopicPartition tp, Short error, String metadata, long offset) {
- OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, error);
- OffsetFetchResponse response = new OffsetFetchResponse(Collections.singletonMap(tp, data));
- return response.toStruct();
- }
-
- private OffsetCommitCallback callback(final AtomicBoolean success) {
- return new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if (exception == null)
- success.set(true);
- }
- };
- }
-
- private static class MockCommitCallback implements OffsetCommitCallback {
- public int invoked = 0;
- public Exception exception = null;
-
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- invoked++;
- this.exception = exception;
- }
- }
-
- private static class MockRebalanceListener implements ConsumerRebalanceListener {
- public Collection<TopicPartition> revoked;
- public Collection<TopicPartition> assigned;
- public int revokedCount = 0;
- public int assignedCount = 0;
-
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
- this.assigned = partitions;
- assignedCount++;
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
- this.revoked = partitions;
- revokedCount++;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4929449..8773f8c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -350,11 +350,11 @@ public class FetcherTest {
@Test
public void testGetAllTopics() throws InterruptedException {
- // sending response before request, as getAllTopics is a blocking call
+ // sending response before request, as getTopicMetadata is a blocking call
client.prepareResponse(
new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct());
- Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopics(5000L);
+ Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L);
assertEquals(cluster.topics().size(), allTopics.size());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
new file mode 100644
index 0000000..40ae661
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MockPartitionAssignor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.Map;
+
+public class MockPartitionAssignor extends AbstractPartitionAssignor {
+
+ private Map<String, List<TopicPartition>> result = null;
+
+ @Override
+ public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
+ Map<String, List<String>> subscriptions) {
+ if (result == null)
+ throw new IllegalStateException("Call to assign with no result prepared");
+ return result;
+ }
+
+ @Override
+ public String name() {
+ return "consumer-mock-assignor";
+ }
+
+ public void clear() {
+ this.result = null;
+ }
+
+ public void prepare(Map<String, List<TopicPartition>> result) {
+ this.result = result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index cabf591..fb21802 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -29,9 +29,9 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
@@ -151,11 +151,11 @@ public class RequestResponseTest {
}
private AbstractRequest createConsumerMetadataRequest() {
- return new ConsumerMetadataRequest("test-group");
+ return new GroupMetadataRequest("test-group");
}
private AbstractRequestResponse createConsumerMetadataResponse() {
- return new ConsumerMetadataResponse((short) 1, new Node(10, "host1", 2014));
+ return new GroupMetadataResponse(Errors.NONE.code(), new Node(10, "host1", 2014));
}
private AbstractRequest createFetchRequest() {
@@ -180,11 +180,17 @@ public class RequestResponseTest {
}
private AbstractRequest createJoinGroupRequest() {
- return new JoinGroupRequest("group1", 30000, Arrays.asList("topic1"), "consumer1", "strategy1");
+ ByteBuffer metadata = ByteBuffer.wrap(new byte[] {});
+ List<JoinGroupRequest.GroupProtocol> protocols = new ArrayList<>();
+ protocols.add(new JoinGroupRequest.GroupProtocol("consumer-range", metadata));
+ return new JoinGroupRequest("group1", 30000, "consumer1", "consumer", protocols);
}
private AbstractRequestResponse createJoinGroupResponse() {
- return new JoinGroupResponse(Errors.NONE.code(), 1, "consumer1", Arrays.asList(new TopicPartition("test11", 1), new TopicPartition("test2", 1)));
+ Map<String, ByteBuffer> members = new HashMap<>();
+ members.put("consumer1", ByteBuffer.wrap(new byte[]{}));
+ members.put("consumer2", ByteBuffer.wrap(new byte[]{}));
+ return new JoinGroupResponse(Errors.NONE.code(), 1, "range", "consumer1", "leader", members);
}
private AbstractRequest createLeaveGroupRequest() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
index 1ff5e73..eb62c9e 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
@@ -340,7 +340,7 @@ public class KafkaBasedLogTest {
consumer.schedulePollTask(new Runnable() {
@Override
public void run() {
- consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
+ consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
}
});
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala
index 6a8a8a2..fd6d420 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -31,7 +31,7 @@ object AclCommand {
val Newline = scala.util.Properties.lineSeparator
val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
Topic -> Set(Read, Write, Describe),
- ConsumerGroup -> Set(Read),
+ Group -> Set(Read),
Cluster -> Set(Create, ClusterAction)
)
@@ -146,14 +146,14 @@ object AclCommand {
val resources = getResource(opts)
val topics: Set[Resource] = getResource(opts).filter(_.resourceType == Topic)
- val consumerGroups: Set[Resource] = resources.filter(_.resourceType == ConsumerGroup)
+ val groups: Set[Resource] = resources.filter(_.resourceType == Group)
//Read,Describe on topic, Read on consumerGroup + Create on cluster
val acls = getAcl(opts, Set(Read, Describe))
topics.map(_ -> acls).toMap[Resource, Set[Acl]] ++
- consumerGroups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
+ groups.map(_ -> getAcl(opts, Set(Read))).toMap[Resource, Set[Acl]]
}
private def getCliResourceToAcls(opts: AclCommandOptions): Map[Resource, Set[Acl]] = {
@@ -221,10 +221,10 @@ object AclCommand {
resources += Resource.ClusterResource
if (opts.options.has(opts.groupOpt))
- opts.options.valuesOf(opts.groupOpt).asScala.foreach(consumerGroup => resources += new Resource(ConsumerGroup, consumerGroup.trim))
+ opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resources += new Resource(Group, group.trim))
if (resources.isEmpty && dieIfNoResourceFound)
- CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --consumer-group <group>")
+ CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>")
resources
}
@@ -266,16 +266,16 @@ object AclCommand {
.withValuesSeparatedBy(Delimiter)
val clusterOpt = parser.accepts("cluster", "Add/Remove cluster acls.")
- val groupOpt = parser.accepts("consumer-group", "Comma separated list of consumer groups to which the acls should be added or removed. " +
- "A value of * indicates the acls should apply to all consumer-groups.")
+ val groupOpt = parser.accepts("group", "Comma separated list of groups to which the acls should be added or removed. " +
+ "A value of * indicates the acls should apply to all groups.")
.withRequiredArg
- .describedAs("consumer-group")
+ .describedAs("group")
.ofType(classOf[String])
.withValuesSeparatedBy(Delimiter)
val addOpt = parser.accepts("add", "Indicates you are trying to add acls.")
val removeOpt = parser.accepts("remove", "Indicates you are trying to remove acls.")
- val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --consumer-group <group> or --cluster to specify a resource.")
+ val listOpt = parser.accepts("list", "List acls for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.")
val operationsOpt = parser.accepts("operations", "Comma separated list of operations, default is All. Valid operation names are: " + Newline +
Operation.values.map("\t" + _).mkString(Newline) + Newline)
@@ -320,7 +320,7 @@ object AclCommand {
"This will generate acls that allows WRITE,DESCRIBE on topic and CREATE on cluster. ")
val consumerOpt = parser.accepts("consumer", "Convenience option to add/remove acls for consumer role. " +
- "This will generate acls that allows READ,DESCRIBE on topic and READ on consumer-group.")
+ "This will generate acls that allows READ,DESCRIBE on topic and READ on group.")
val helpOpt = parser.accepts("help", "Print usage information.")
@@ -343,7 +343,7 @@ object AclCommand {
CommandLineUtils.printUsageAndDie(parser, "With --producer you must specify a --topic")
if (options.has(consumerOpt) && (!options.has(topicOpt) || !options.has(groupOpt) || (!options.has(producerOpt) && options.has(clusterOpt))))
- CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --consumer-group and no --cluster option should be specified.")
+ CommandLineUtils.printUsageAndDie(parser, "With --consumer you must specify a --topic and a --group and no --cluster option should be specified.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index e6ca112..ed54aee 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -23,16 +23,15 @@ import kafka.common.{Topic, AdminCommandFailedException}
import kafka.utils.CommandLineUtils
import kafka.utils._
import kafka.utils.ZkUtils._
-import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import scala.collection._
import scala.collection.JavaConversions._
import kafka.log.{Defaults, LogConfig}
import kafka.consumer.{ConsumerConfig, Whitelist}
-import kafka.server.{ConfigType, OffsetManager}
+import kafka.server.ConfigType
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.security.JaasUtils
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.GroupCoordinator
object TopicCommand extends Logging {
@@ -130,7 +129,7 @@ object TopicCommand extends Logging {
}
if(opts.options.has(opts.partitionsOpt)) {
- if (topic == ConsumerCoordinator.OffsetsTopicName) {
+ if (topic == GroupCoordinator.OffsetsTopicName) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
}
println("WARNING: If partitions are increased for a topic that has a key, the partition " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/86eb74d9/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
deleted file mode 100644
index 258d5fe..0000000
--- a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import java.nio.ByteBuffer
-
-import kafka.common.ErrorMapping
-import kafka.network.{RequestOrResponseSend, RequestChannel}
-import kafka.network.RequestChannel.Response
-
-object ConsumerMetadataRequest {
- val CurrentVersion = 0.shortValue
- val DefaultClientId = ""
-
- def readFrom(buffer: ByteBuffer) = {
- // envelope
- val versionId = buffer.getShort
- val correlationId = buffer.getInt
- val clientId = ApiUtils.readShortString(buffer)
-
- // request
- val group = ApiUtils.readShortString(buffer)
- ConsumerMetadataRequest(group, versionId, correlationId, clientId)
- }
-
-}
-
-case class ConsumerMetadataRequest(group: String,
- versionId: Short = ConsumerMetadataRequest.CurrentVersion,
- correlationId: Int = 0,
- clientId: String = ConsumerMetadataRequest.DefaultClientId)
- extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey)) {
-
- def sizeInBytes =
- 2 + /* versionId */
- 4 + /* correlationId */
- ApiUtils.shortStringLength(clientId) +
- ApiUtils.shortStringLength(group)
-
- def writeTo(buffer: ByteBuffer) {
- // envelope
- buffer.putShort(versionId)
- buffer.putInt(correlationId)
- ApiUtils.writeShortString(buffer, clientId)
-
- // consumer metadata request
- ApiUtils.writeShortString(buffer, group)
- }
-
- override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
- // return ConsumerCoordinatorNotAvailable for all uncaught errors
- val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, correlationId)
- requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, errorResponse)))
- }
-
- def describe(details: Boolean) = {
- val consumerMetadataRequest = new StringBuilder
- consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
- consumerMetadataRequest.append("; Version: " + versionId)
- consumerMetadataRequest.append("; CorrelationId: " + correlationId)
- consumerMetadataRequest.append("; ClientId: " + clientId)
- consumerMetadataRequest.append("; Group: " + group)
- consumerMetadataRequest.toString()
- }
-}
\ No newline at end of file