You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/26 21:11:13 UTC
[10/10] kafka git commit: KAFKA-5059: Implement Transactional
Coordinator
KAFKA-5059: Implement Transactional Coordinator
Author: Damian Guy <da...@gmail.com>
Author: Guozhang Wang <wa...@gmail.com>
Author: Apurva Mehta <ap...@confluent.io>
Reviewers: Guozhang Wang, Jason Gustafson, Apurva Mehta, Jun Rao
Closes #2849 from dguy/exactly-once-tc
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f69d9415
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f69d9415
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f69d9415
Branch: refs/heads/trunk
Commit: f69d94158ce72f4ef2df5b98f2395c2b2e61251e
Parents: 8d8ab2e
Author: Damian Guy <da...@gmail.com>
Authored: Wed Apr 26 14:10:38 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Apr 26 14:10:38 2017 -0700
----------------------------------------------------------------------
checkstyle/import-control-core.xml | 4 +
.../clients/consumer/internals/Fetcher.java | 8 +-
.../errors/ConcurrentTransactionsException.java | 25 +
.../errors/InvalidTxnTimeoutException.java | 1 +
.../apache/kafka/common/protocol/Errors.java | 10 +-
.../kafka/common/requests/InitPidRequest.java | 2 -
.../common/requests/WriteTxnMarkersRequest.java | 41 +
.../common/requests/RequestResponseTest.java | 3 +-
config/server.properties | 6 +-
.../main/scala/kafka/admin/AdminClient.scala | 2 +-
.../main/scala/kafka/cluster/Partition.scala | 4 +-
.../kafka/common/InterBrokerSendThread.scala | 69 ++
core/src/main/scala/kafka/common/Topic.scala | 3 +-
.../kafka/coordinator/DelayedHeartbeat.scala | 40 -
.../scala/kafka/coordinator/DelayedJoin.scala | 44 -
.../kafka/coordinator/GroupCoordinator.scala | 817 -------------
.../scala/kafka/coordinator/GroupMetadata.scala | 327 -----
.../coordinator/GroupMetadataManager.scala | 1122 -----------------
.../kafka/coordinator/MemberMetadata.scala | 123 --
.../scala/kafka/coordinator/OffsetConfig.scala | 61 -
.../kafka/coordinator/ProducerIdManager.scala | 153 ---
.../coordinator/TransactionCoordinator.scala | 92 --
.../coordinator/group/DelayedHeartbeat.scala | 40 +
.../kafka/coordinator/group/DelayedJoin.scala | 43 +
.../coordinator/group/GroupCoordinator.scala | 817 +++++++++++++
.../kafka/coordinator/group/GroupMetadata.scala | 326 +++++
.../group/GroupMetadataManager.scala | 1123 +++++++++++++++++
.../coordinator/group/MemberMetadata.scala | 120 ++
.../kafka/coordinator/group/OffsetConfig.scala | 61 +
.../transaction/DelayedTxnMarker.scala | 48 +
.../transaction/ProducerIdManager.scala | 153 +++
.../transaction/TransactionCoordinator.scala | 434 +++++++
.../transaction/TransactionLog.scala | 275 +++++
.../transaction/TransactionMarkerChannel.scala | 168 +++
.../TransactionMarkerChannelManager.scala | 159 +++
...nsactionMarkerRequestCompletionHandler.scala | 95 ++
.../transaction/TransactionMetadata.scala | 175 +++
.../transaction/TransactionStateManager.scala | 429 +++++++
.../main/scala/kafka/server/AdminManager.scala | 2 +-
.../scala/kafka/server/DelayedOperation.scala | 32 +-
.../src/main/scala/kafka/server/KafkaApis.scala | 194 ++-
.../main/scala/kafka/server/KafkaConfig.scala | 54 +-
.../main/scala/kafka/server/KafkaServer.scala | 7 +-
.../main/scala/kafka/server/MetadataCache.scala | 14 +-
.../scala/kafka/server/ReplicaManager.scala | 6 +-
.../scala/kafka/tools/DumpLogSegments.scala | 2 +-
.../main/scala/kafka/utils/KafkaScheduler.scala | 2 +-
.../scala/kafka/utils/ShutdownableThread.scala | 4 +-
...eListenersWithSameSecurityProtocolTest.scala | 2 +-
.../common/InterBrokerSendThreadTest.scala | 131 ++
.../GroupCoordinatorResponseTest.scala | 1154 ------------------
.../coordinator/GroupCoordinatorTest.scala | 36 -
.../coordinator/GroupMetadataManagerTest.scala | 814 ------------
.../kafka/coordinator/GroupMetadataTest.scala | 360 ------
.../kafka/coordinator/MemberMetadataTest.scala | 88 --
.../coordinator/ProducerIdManagerTest.scala | 105 --
.../TransactionCoordinatorTest.scala | 93 --
.../group/GroupCoordinatorResponseTest.scala | 1154 ++++++++++++++++++
.../group/GroupCoordinatorTest.scala | 36 +
.../group/GroupMetadataManagerTest.scala | 815 +++++++++++++
.../coordinator/group/GroupMetadataTest.scala | 362 ++++++
.../coordinator/group/MemberMetadataTest.scala | 88 ++
.../transaction/ProducerIdManagerTest.scala | 104 ++
.../TransactionCoordinatorIntegrationTest.scala | 98 ++
.../TransactionCoordinatorTest.scala | 787 ++++++++++++
.../transaction/TransactionLogTest.scala | 109 ++
.../TransactionMarkerChannelManagerTest.scala | 283 +++++
.../TransactionMarkerChannelTest.scala | 179 +++
...tionMarkerRequestCompletionHandlerTest.scala | 164 +++
.../TransactionStateManagerTest.scala | 354 ++++++
.../kafka/server/DelayedOperationTest.scala | 34 +-
.../unit/kafka/server/KafkaConfigTest.scala | 8 +-
72 files changed, 9554 insertions(+), 5544 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/checkstyle/import-control-core.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 5714bfd..856df58 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -61,6 +61,10 @@
<allow pkg="org.apache.kafka.clients.consumer" />
</subpackage>
+ <subpackage name="coordinator">
+ <allow class="kafka.server.MetadataCache" />
+ </subpackage>
+
<subpackage name="examples">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="kafka.api" />
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e337f4e..f421dfb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -868,7 +868,13 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
return partitionRecords;
}
- private ConsumerRecord<K, V> parseRecord(TopicPartition partition, RecordBatch batch, Record record) {
+
+ /**
+ * Parse the record entry, deserializing the key / value fields if necessary
+ */
+ private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
+ RecordBatch batch,
+ Record record) {
try {
long offset = record.offset();
long timestamp = record.timestamp();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
new file mode 100644
index 0000000..6ad6b8a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ConcurrentTransactionsException.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class ConcurrentTransactionsException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public ConcurrentTransactionsException(final String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
index 12d873e..e5df248 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTxnTimeoutException.java
@@ -31,3 +31,4 @@ public class InvalidTxnTimeoutException extends ApiException {
super(message);
}
}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 375cf16..80e9191 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.protocol;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
+import org.apache.kafka.common.errors.ConcurrentTransactionsException;
import org.apache.kafka.common.errors.ControllerMovedException;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
@@ -38,11 +39,12 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
import org.apache.kafka.common.errors.InvalidReplicationFactorException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidRequiredAcksException;
+import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
import org.apache.kafka.common.errors.InvalidTimestampException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
-import org.apache.kafka.common.errors.InvalidTxnTimeoutException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
@@ -52,7 +54,6 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderForPartitionException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
-import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -186,7 +187,10 @@ public enum Errors {
new InvalidPidMappingException("The PID mapping is invalid")),
INVALID_TRANSACTION_TIMEOUT(50,
new InvalidTxnTimeoutException("The transaction timeout is larger than the maximum value allowed by the broker " +
- "(as configured by max.transaction.timeout.ms)."));
+ "(as configured by max.transaction.timeout.ms).")),
+ CONCURRENT_TRANSACTIONS(51,
+ new ConcurrentTransactionsException("The producer attempted to update a transaction " +
+ "while another concurrent operation on the same transaction was ongoing"));
private static final Logger log = LoggerFactory.getLogger(Errors.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
index dedbc0f..eff05e2 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -28,7 +28,6 @@ public class InitPidRequest extends AbstractRequest {
private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
private static final String TRANSACTION_TIMEOUT_KEY_NAME = "transaction_timeout_ms";
-
private final String transactionalId;
private final int transactionTimeoutMs;
@@ -63,7 +62,6 @@ public class InitPidRequest extends AbstractRequest {
return "(type=InitPidRequest, transactionalId=" + transactionalId + ", transactionTimeoutMs=" +
transactionTimeoutMs + ")";
}
-
}
public InitPidRequest(Struct struct, short version) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
index fe64603..998d504 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
public class WriteTxnMarkersRequest extends AbstractRequest {
private static final String COORDINATOR_EPOCH_KEY_NAME = "coordinator_epoch";
@@ -67,6 +68,33 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
public List<TopicPartition> partitions() {
return partitions;
}
+
+
+ @Override
+ public String toString() {
+ return "TxnMarkerEntry{" +
+ "pid=" + producerId +
+ ", epoch=" + producerEpoch +
+ ", result=" + result +
+ ", partitions=" + partitions +
+ '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final TxnMarkerEntry that = (TxnMarkerEntry) o;
+ return producerId == that.producerId &&
+ producerEpoch == that.producerEpoch &&
+ result == that.result &&
+ Objects.equals(partitions, that.partitions);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(producerId, producerEpoch, result, partitions);
+ }
}
public static class Builder extends AbstractRequest.Builder<WriteTxnMarkersRequest> {
@@ -183,4 +211,17 @@ public class WriteTxnMarkersRequest extends AbstractRequest {
return new WriteTxnMarkersRequest(ApiKeys.WRITE_TXN_MARKERS.parseRequest(version, buffer), version);
}
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ final WriteTxnMarkersRequest that = (WriteTxnMarkersRequest) o;
+ return coordinatorEpoch == that.coordinatorEpoch &&
+ Objects.equals(markers, that.markers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(coordinatorEpoch, markers);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e41c38e..9e283c0 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -568,7 +568,7 @@ public class RequestResponseTest {
}
private ListGroupsResponse createListGroupsResponse() {
- List<ListGroupsResponse.Group> groups = asList(new ListGroupsResponse.Group("test-group", "consumer"));
+ List<ListGroupsResponse.Group> groups = Collections.singletonList(new ListGroupsResponse.Group("test-group", "consumer"));
return new ListGroupsResponse(Errors.NONE, groups);
}
@@ -844,6 +844,7 @@ public class RequestResponseTest {
return new InitPidResponse(Errors.NONE, 3332, (short) 3);
}
+
private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
Map<TopicPartition, Integer> epochs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index 37b5bb3..da72075 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -72,9 +72,11 @@ num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
-# The replication factor for the group metadata internal topic "__consumer_offsets".
-# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
+# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
+# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
+transaction.state.log.replication.factor=1
+transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 4d218c1..7342bbb 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit, Future}
import kafka.admin.AdminClient.DeleteRecordsResult
import kafka.common.KafkaException
-import kafka.coordinator.GroupOverview
+import kafka.coordinator.group.GroupOverview
import kafka.utils.Logging
import org.apache.kafka.clients._
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 72e505d..1eea8dc 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -241,9 +241,9 @@ class Partition(val topic: String,
getReplica(replicaId) match {
case Some(replica) =>
// No need to calculate low watermark if there is no delayed DeleteRecordsRequest
- val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+ val oldLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
replica.updateLogReadResult(logReadResult)
- val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed() > 0) lowWatermarkIfLeader else -1L
+ val newLeaderLW = if (replicaManager.delayedDeleteRecordsPurgatory.delayed > 0) lowWatermarkIfLeader else -1L
// check if the LW of the partition has incremented
// since the replica's logStartOffset may have incremented
val leaderLWIncremented = newLeaderLW > oldLeaderLW
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
new file mode 100644
index 0000000..b626954
--- /dev/null
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.common
+
+import kafka.utils.ShutdownableThread
+import org.apache.kafka.clients.{ClientResponse, NetworkClient, RequestCompletionHandler}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.Time
+
+
+/**
+ * Class for inter-broker send thread that utilize a non-blocking network client.
+ */
+class InterBrokerSendThread(name: String,
+ networkClient: NetworkClient,
+ requestGenerator: () => Iterable[RequestAndCompletionHandler],
+ time: Time)
+ extends ShutdownableThread(name, isInterruptible = false) {
+
+ override def doWork() {
+ val now = time.milliseconds()
+ var pollTimeout = Long.MaxValue
+
+ val requestsToSend: Iterable[RequestAndCompletionHandler] = requestGenerator()
+
+ for (request: RequestAndCompletionHandler <- requestsToSend) {
+ val destination = Integer.toString(request.destination.id())
+ val completionHandler = request.handler
+ // TODO: Need to check inter broker protocol and error if new request is not supported
+ val clientRequest = networkClient.newClientRequest(destination,
+ request.request,
+ now,
+ true,
+ completionHandler)
+
+ if (networkClient.ready(request.destination, now)) {
+ networkClient.send(clientRequest, now)
+ } else {
+ val disConnectedResponse: ClientResponse = new ClientResponse(clientRequest.makeHeader(request.request.desiredOrLatestVersion()),
+ completionHandler, destination,
+ now /* createdTimeMs */, now /* receivedTimeMs */, true /* disconnected */, null /* versionMismatch */, null /* responseBody */)
+
+ // poll timeout would be the minimum of connection delay if there are any dest yet to be reached;
+ // otherwise it is infinity
+ pollTimeout = Math.min(pollTimeout, networkClient.connectionDelay(request.destination, now))
+
+ completionHandler.onComplete(disConnectedResponse)
+ }
+ }
+ networkClient.poll(pollTimeout, now)
+ }
+}
+
+case class RequestAndCompletionHandler(destination: Node, request: AbstractRequest.Builder[_ <: AbstractRequest], handler: RequestCompletionHandler)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index 4a65afb..6ca7175 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -24,7 +24,8 @@ import scala.collection.immutable
object Topic {
val GroupMetadataTopicName = "__consumer_offsets"
- val InternalTopics = immutable.Set(GroupMetadataTopicName)
+ val TransactionStateTopicName = "__transaction_state"
+ val InternalTopics = immutable.Set(GroupMetadataTopicName, TransactionStateTopicName)
val legalChars = "[a-zA-Z0-9\\._\\-]"
private val maxNameLength = 249
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
deleted file mode 100644
index b05186c..0000000
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.server.DelayedOperation
-
-/**
- * Delayed heartbeat operations that are added to the purgatory for session timeout checking.
- * Heartbeats are paused during rebalance.
- */
-private[coordinator] class DelayedHeartbeat(coordinator: GroupCoordinator,
- group: GroupMetadata,
- member: MemberMetadata,
- heartbeatDeadline: Long,
- sessionTimeout: Long)
- extends DelayedOperation(sessionTimeout) {
-
- // overridden since tryComplete already synchronizes on the group. This makes it safe to
- // call purgatory operations while holding the group lock.
- override def safeTryComplete(): Boolean = tryComplete()
-
- override def tryComplete(): Boolean = coordinator.tryCompleteHeartbeat(group, member, heartbeatDeadline, forceComplete)
- override def onExpiration() = coordinator.onExpireHeartbeat(group, member, heartbeatDeadline)
- override def onComplete() = coordinator.onCompleteHeartbeat()
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala b/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
deleted file mode 100644
index 8744f16..0000000
--- a/core/src/main/scala/kafka/coordinator/DelayedJoin.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.server.DelayedOperation
-
-/**
- * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
- *
- * Whenever a join-group request is received, check if all known group members have requested
- * to re-join the group; if yes, complete this operation to proceed rebalance.
- *
- * When the operation has expired, any known members that have not requested to re-join
- * the group are marked as failed, and complete this operation to proceed rebalance with
- * the rest of the group.
- */
-private[coordinator] class DelayedJoin(coordinator: GroupCoordinator,
- group: GroupMetadata,
- rebalanceTimeout: Long)
- extends DelayedOperation(rebalanceTimeout) {
-
- // overridden since tryComplete already synchronizes on the group. This makes it safe to
- // call purgatory operations while holding the group lock.
- override def safeTryComplete(): Boolean = tryComplete()
-
- override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete)
- override def onExpiration() = coordinator.onExpireJoin()
- override def onComplete() = coordinator.onCompleteJoin(group)
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
deleted file mode 100644
index d78d1df..0000000
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ /dev/null
@@ -1,817 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator
-
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
-
-import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
-import kafka.log.LogConfig
-import kafka.message.ProducerCompressionCodec
-import kafka.server._
-import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest}
-
-import scala.collection.{Map, Seq, immutable}
-
-
-/**
- * GroupCoordinator handles general group membership and offset management.
- *
- * Each Kafka server instantiates a coordinator which is responsible for a set of
- * groups. Groups are assigned to coordinators based on their group names.
- */
-class GroupCoordinator(val brokerId: Int,
- val groupConfig: GroupConfig,
- val offsetConfig: OffsetConfig,
- val groupManager: GroupMetadataManager,
- val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
- val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
- time: Time) extends Logging {
- type JoinCallback = JoinGroupResult => Unit
- type SyncCallback = (Array[Byte], Errors) => Unit
-
- this.logIdent = "[GroupCoordinator " + brokerId + "]: "
-
- private val isActive = new AtomicBoolean(false)
-
- def offsetsTopicConfigs: Properties = {
- val props = new Properties
- props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
- props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
- props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
- props
- }
-
- /**
- * NOTE: If a group lock and metadataLock are simultaneously needed,
- * be sure to acquire the group lock before metadataLock to prevent deadlock
- */
-
- /**
- * Startup logic executed at the same time when the server starts up.
- */
- def startup(enableMetadataExpiration: Boolean = true) {
- info("Starting up.")
- if (enableMetadataExpiration)
- groupManager.enableMetadataExpiration()
- isActive.set(true)
- info("Startup complete.")
- }
-
- /**
- * Shutdown logic executed at the same time when server shuts down.
- * Ordering of actions should be reversed from the startup process.
- */
- def shutdown() {
- info("Shutting down.")
- isActive.set(false)
- groupManager.shutdown()
- heartbeatPurgatory.shutdown()
- joinPurgatory.shutdown()
- info("Shutdown complete.")
- }
-
- def handleJoinGroup(groupId: String,
- memberId: String,
- clientId: String,
- clientHost: String,
- rebalanceTimeoutMs: Int,
- sessionTimeoutMs: Int,
- protocolType: String,
- protocols: List[(String, Array[Byte])],
- responseCallback: JoinCallback) {
- if (!isActive.get) {
- responseCallback(joinError(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
- } else if (!validGroupId(groupId)) {
- responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID))
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(joinError(memberId, Errors.NOT_COORDINATOR))
- } else if (isCoordinatorLoadInProgress(groupId)) {
- responseCallback(joinError(memberId, Errors.COORDINATOR_LOAD_IN_PROGRESS))
- } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
- sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
- responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
- } else {
- // only try to create the group if the group is not unknown AND
- // the member id is UNKNOWN, if member is specified but group does not
- // exist we should reject the request
- groupManager.getGroup(groupId) match {
- case None =>
- if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
- } else {
- val group = groupManager.addGroup(new GroupMetadata(groupId))
- doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
- }
-
- case Some(group) =>
- doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
- }
- }
- }
-
- private def doJoinGroup(group: GroupMetadata,
- memberId: String,
- clientId: String,
- clientHost: String,
- rebalanceTimeoutMs: Int,
- sessionTimeoutMs: Int,
- protocolType: String,
- protocols: List[(String, Array[Byte])],
- responseCallback: JoinCallback) {
- group synchronized {
- if (!group.is(Empty) && (group.protocolType != Some(protocolType) || !group.supportsProtocols(protocols.map(_._1).toSet))) {
- // if the new member does not support the group protocol, reject it
- responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
- } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
- // if the member trying to register with a un-recognized id, send the response to let
- // it reset its member id and retry
- responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
- } else {
- group.currentState match {
- case Dead =>
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id,
- responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
-
- case PreparingRebalance =>
- if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
- } else {
- val member = group.get(memberId)
- updateMemberAndRebalance(group, member, protocols, responseCallback)
- }
-
- case AwaitingSync =>
- if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
- } else {
- val member = group.get(memberId)
- if (member.matches(protocols)) {
- // member is joining with the same metadata (which could be because it failed to
- // receive the initial JoinGroup response), so just return current group information
- // for the current generation.
- responseCallback(JoinGroupResult(
- members = if (memberId == group.leaderId) {
- group.currentMemberMetadata
- } else {
- Map.empty
- },
- memberId = memberId,
- generationId = group.generationId,
- subProtocol = group.protocol,
- leaderId = group.leaderId,
- error = Errors.NONE))
- } else {
- // member has changed metadata, so force a rebalance
- updateMemberAndRebalance(group, member, protocols, responseCallback)
- }
- }
-
- case Empty | Stable =>
- if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
- // if the member id is unknown, register the member to the group
- addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
- } else {
- val member = group.get(memberId)
- if (memberId == group.leaderId || !member.matches(protocols)) {
- // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
- // The latter allows the leader to trigger rebalances for changes affecting assignment
- // which do not affect the member metadata (such as topic metadata changes for the consumer)
- updateMemberAndRebalance(group, member, protocols, responseCallback)
- } else {
- // for followers with no actual change to their metadata, just return group information
- // for the current generation which will allow them to issue SyncGroup
- responseCallback(JoinGroupResult(
- members = Map.empty,
- memberId = memberId,
- generationId = group.generationId,
- subProtocol = group.protocol,
- leaderId = group.leaderId,
- error = Errors.NONE))
- }
- }
- }
-
- if (group.is(PreparingRebalance))
- joinPurgatory.checkAndComplete(GroupKey(group.groupId))
- }
- }
- }
-
- def handleSyncGroup(groupId: String,
- generation: Int,
- memberId: String,
- groupAssignment: Map[String, Array[Byte]],
- responseCallback: SyncCallback) {
- if (!isActive.get) {
- responseCallback(Array.empty, Errors.COORDINATOR_NOT_AVAILABLE)
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(Array.empty, Errors.NOT_COORDINATOR)
- } else {
- groupManager.getGroup(groupId) match {
- case None => responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
- case Some(group) => doSyncGroup(group, generation, memberId, groupAssignment, responseCallback)
- }
- }
- }
-
- private def doSyncGroup(group: GroupMetadata,
- generationId: Int,
- memberId: String,
- groupAssignment: Map[String, Array[Byte]],
- responseCallback: SyncCallback) {
- var delayedGroupStore: Option[DelayedStore] = None
-
- group synchronized {
- if (!group.has(memberId)) {
- responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
- } else if (generationId != group.generationId) {
- responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
- } else {
- group.currentState match {
- case Empty | Dead =>
- responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
-
- case PreparingRebalance =>
- responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
-
- case AwaitingSync =>
- group.get(memberId).awaitingSyncCallback = responseCallback
-
- // if this is the leader, then we can attempt to persist state and transition to stable
- if (memberId == group.leaderId) {
- info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
-
- // fill any missing members with an empty assignment
- val missing = group.allMembers -- groupAssignment.keySet
- val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
-
- delayedGroupStore = groupManager.prepareStoreGroup(group, assignment, (error: Errors) => {
- group synchronized {
- // another member may have joined the group while we were awaiting this callback,
- // so we must ensure we are still in the AwaitingSync state and the same generation
- // when it gets invoked. if we have transitioned to another state, then do nothing
- if (group.is(AwaitingSync) && generationId == group.generationId) {
- if (error != Errors.NONE) {
- resetAndPropagateAssignmentError(group, error)
- maybePrepareRebalance(group)
- } else {
- setAndPropagateAssignment(group, assignment)
- group.transitionTo(Stable)
- }
- }
- }
- })
- }
-
- case Stable =>
- // if the group is stable, we just return the current assignment
- val memberMetadata = group.get(memberId)
- responseCallback(memberMetadata.assignment, Errors.NONE)
- completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
- }
- }
- }
-
- // store the group metadata without holding the group lock to avoid the potential
- // for deadlock if the callback is invoked holding other locks (e.g. the replica
- // state change lock)
- delayedGroupStore.foreach(groupManager.store)
- }
-
- def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit) {
- if (!isActive.get) {
- responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(Errors.NOT_COORDINATOR)
- } else if (isCoordinatorLoadInProgress(groupId)) {
- responseCallback(Errors.COORDINATOR_LOAD_IN_PROGRESS)
- } else {
- groupManager.getGroup(groupId) match {
- case None =>
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
- // joining without specified consumer id,
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
- case Some(group) =>
- group synchronized {
- if (group.is(Dead) || !group.has(memberId)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
- } else {
- val member = group.get(memberId)
- removeHeartbeatForLeavingMember(group, member)
- onMemberFailure(group, member)
- responseCallback(Errors.NONE)
- }
- }
- }
- }
- }
-
- def handleHeartbeat(groupId: String,
- memberId: String,
- generationId: Int,
- responseCallback: Errors => Unit) {
- if (!isActive.get) {
- responseCallback(Errors.COORDINATOR_NOT_AVAILABLE)
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(Errors.NOT_COORDINATOR)
- } else if (isCoordinatorLoadInProgress(groupId)) {
- // the group is still loading, so respond just blindly
- responseCallback(Errors.NONE)
- } else {
- groupManager.getGroup(groupId) match {
- case None =>
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
- case Some(group) =>
- group synchronized {
- group.currentState match {
- case Dead =>
- // if the group is marked as dead, it means some other thread has just removed the group
- // from the coordinator metadata; this is likely that the group has migrated to some other
- // coordinator OR the group is in a transient unstable phase. Let the member retry
- // joining without the specified member id,
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
- case Empty =>
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
-
- case AwaitingSync =>
- if (!group.has(memberId))
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
- else
- responseCallback(Errors.REBALANCE_IN_PROGRESS)
-
- case PreparingRebalance =>
- if (!group.has(memberId)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
- } else if (generationId != group.generationId) {
- responseCallback(Errors.ILLEGAL_GENERATION)
- } else {
- val member = group.get(memberId)
- completeAndScheduleNextHeartbeatExpiration(group, member)
- responseCallback(Errors.REBALANCE_IN_PROGRESS)
- }
-
- case Stable =>
- if (!group.has(memberId)) {
- responseCallback(Errors.UNKNOWN_MEMBER_ID)
- } else if (generationId != group.generationId) {
- responseCallback(Errors.ILLEGAL_GENERATION)
- } else {
- val member = group.get(memberId)
- completeAndScheduleNextHeartbeatExpiration(group, member)
- responseCallback(Errors.NONE)
- }
- }
- }
- }
- }
- }
-
- def handleCommitOffsets(groupId: String,
- memberId: String,
- generationId: Int,
- offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
- if (!isActive.get) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_NOT_AVAILABLE))
- } else if (!isCoordinatorForGroup(groupId)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR))
- } else if (isCoordinatorLoadInProgress(groupId)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.COORDINATOR_LOAD_IN_PROGRESS))
- } else {
- groupManager.getGroup(groupId) match {
- case None =>
- if (generationId < 0) {
- // the group is not relying on Kafka for group management, so allow the commit
- val group = groupManager.addGroup(new GroupMetadata(groupId))
- doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
- } else {
- // or this is a request coming from an older generation. either way, reject the commit
- responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
- }
-
- case Some(group) =>
- doCommitOffsets(group, memberId, generationId, offsetMetadata, responseCallback)
- }
- }
- }
-
- private def doCommitOffsets(group: GroupMetadata,
- memberId: String,
- generationId: Int,
- offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition, Errors] => Unit) {
- var delayedOffsetStore: Option[DelayedStore] = None
-
- group synchronized {
- if (group.is(Dead)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
- } else if (generationId < 0 && group.is(Empty)) {
- // the group is only using Kafka to store offsets
- delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
- offsetMetadata, responseCallback)
- } else if (group.is(AwaitingSync)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
- } else if (!group.has(memberId)) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
- } else if (generationId != group.generationId) {
- responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
- } else {
- val member = group.get(memberId)
- completeAndScheduleNextHeartbeatExpiration(group, member)
- delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
- offsetMetadata, responseCallback)
- }
- }
-
- // store the offsets without holding the group lock
- delayedOffsetStore.foreach(groupManager.store)
- }
-
- def handleFetchOffsets(groupId: String,
- partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
- if (!isActive.get)
- (Errors.COORDINATOR_NOT_AVAILABLE, Map())
- else if (!isCoordinatorForGroup(groupId)) {
- debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId))
- (Errors.NOT_COORDINATOR, Map())
- } else if (isCoordinatorLoadInProgress(groupId))
- (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map())
- else {
- // return offsets blindly regardless the current group state since the group may be using
- // Kafka commit storage without automatic group management
- (Errors.NONE, groupManager.getOffsets(groupId, partitions))
- }
- }
-
- def handleListGroups(): (Errors, List[GroupOverview]) = {
- if (!isActive.get) {
- (Errors.COORDINATOR_NOT_AVAILABLE, List[GroupOverview]())
- } else {
- val errorCode = if (groupManager.isLoading) Errors.COORDINATOR_LOAD_IN_PROGRESS else Errors.NONE
- (errorCode, groupManager.currentGroups.map(_.overview).toList)
- }
- }
-
- def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
- if (!isActive.get) {
- (Errors.COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
- } else if (!isCoordinatorForGroup(groupId)) {
- (Errors.NOT_COORDINATOR, GroupCoordinator.EmptyGroup)
- } else if (isCoordinatorLoadInProgress(groupId)) {
- (Errors.COORDINATOR_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
- } else {
- groupManager.getGroup(groupId) match {
- case None => (Errors.NONE, GroupCoordinator.DeadGroup)
- case Some(group) =>
- group synchronized {
- (Errors.NONE, group.summary)
- }
- }
- }
- }
-
- def handleDeletedPartitions(topicPartitions: Seq[TopicPartition]) {
- groupManager.cleanupGroupMetadata(Some(topicPartitions))
- }
-
- private def onGroupUnloaded(group: GroupMetadata) {
- group synchronized {
- info(s"Unloading group metadata for ${group.groupId} with generation ${group.generationId}")
- val previousState = group.currentState
- group.transitionTo(Dead)
-
- previousState match {
- case Empty | Dead =>
- case PreparingRebalance =>
- for (member <- group.allMemberMetadata) {
- if (member.awaitingJoinCallback != null) {
- member.awaitingJoinCallback(joinError(member.memberId, Errors.NOT_COORDINATOR))
- member.awaitingJoinCallback = null
- }
- }
- joinPurgatory.checkAndComplete(GroupKey(group.groupId))
-
- case Stable | AwaitingSync =>
- for (member <- group.allMemberMetadata) {
- if (member.awaitingSyncCallback != null) {
- member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR)
- member.awaitingSyncCallback = null
- }
- heartbeatPurgatory.checkAndComplete(MemberKey(member.groupId, member.memberId))
- }
- }
- }
- }
-
- private def onGroupLoaded(group: GroupMetadata) {
- group synchronized {
- info(s"Loading group metadata for ${group.groupId} with generation ${group.generationId}")
- assert(group.is(Stable) || group.is(Empty))
- group.allMemberMetadata.foreach(completeAndScheduleNextHeartbeatExpiration(group, _))
- }
- }
-
- def handleGroupImmigration(offsetTopicPartitionId: Int) {
- groupManager.loadGroupsForPartition(offsetTopicPartitionId, onGroupLoaded)
- }
-
- def handleGroupEmigration(offsetTopicPartitionId: Int) {
- groupManager.removeGroupsForPartition(offsetTopicPartitionId, onGroupUnloaded)
- }
-
- private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
- assert(group.is(AwaitingSync))
- group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
- propagateAssignment(group, Errors.NONE)
- }
-
- private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) {
- assert(group.is(AwaitingSync))
- group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte])
- propagateAssignment(group, error)
- }
-
- private def propagateAssignment(group: GroupMetadata, error: Errors) {
- for (member <- group.allMemberMetadata) {
- if (member.awaitingSyncCallback != null) {
- member.awaitingSyncCallback(member.assignment, error)
- member.awaitingSyncCallback = null
-
- // reset the session timeout for members after propagating the member's assignment.
- // This is because if any member's session expired while we were still awaiting either
- // the leader sync group or the storage callback, its expiration will be ignored and no
- // future heartbeat expectations will not be scheduled.
- completeAndScheduleNextHeartbeatExpiration(group, member)
- }
- }
- }
-
- private def validGroupId(groupId: String): Boolean = {
- groupId != null && !groupId.isEmpty
- }
-
- private def joinError(memberId: String, error: Errors): JoinGroupResult = {
- JoinGroupResult(
- members = Map.empty,
- memberId = memberId,
- generationId = 0,
- subProtocol = GroupCoordinator.NoProtocol,
- leaderId = GroupCoordinator.NoLeader,
- error = error)
- }
-
- /**
- * Complete existing DelayedHeartbeats for the given member and schedule the next one
- */
- private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata) {
- // complete current heartbeat expectation
- member.latestHeartbeat = time.milliseconds()
- val memberKey = MemberKey(member.groupId, member.memberId)
- heartbeatPurgatory.checkAndComplete(memberKey)
-
- // reschedule the next heartbeat expiration deadline
- val newHeartbeatDeadline = member.latestHeartbeat + member.sessionTimeoutMs
- val delayedHeartbeat = new DelayedHeartbeat(this, group, member, newHeartbeatDeadline, member.sessionTimeoutMs)
- heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
- }
-
- private def removeHeartbeatForLeavingMember(group: GroupMetadata, member: MemberMetadata) {
- member.isLeaving = true
- val memberKey = MemberKey(member.groupId, member.memberId)
- heartbeatPurgatory.checkAndComplete(memberKey)
- }
-
- private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
- sessionTimeoutMs: Int,
- clientId: String,
- clientHost: String,
- protocolType: String,
- protocols: List[(String, Array[Byte])],
- group: GroupMetadata,
- callback: JoinCallback) = {
- // use the client-id with a random id suffix as the member-id
- val memberId = clientId + "-" + group.generateMemberIdSuffix
- val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
- sessionTimeoutMs, protocolType, protocols)
- member.awaitingJoinCallback = callback
- group.add(member)
- maybePrepareRebalance(group)
- member
- }
-
- private def updateMemberAndRebalance(group: GroupMetadata,
- member: MemberMetadata,
- protocols: List[(String, Array[Byte])],
- callback: JoinCallback) {
- member.supportedProtocols = protocols
- member.awaitingJoinCallback = callback
- maybePrepareRebalance(group)
- }
-
- private def maybePrepareRebalance(group: GroupMetadata) {
- group synchronized {
- if (group.canRebalance)
- prepareRebalance(group)
- }
- }
-
- private def prepareRebalance(group: GroupMetadata) {
- // if any members are awaiting sync, cancel their request and have them rejoin
- if (group.is(AwaitingSync))
- resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
-
- group.transitionTo(PreparingRebalance)
- info("Preparing to restabilize group %s with old generation %s".format(group.groupId, group.generationId))
-
- val rebalanceTimeout = group.rebalanceTimeoutMs
- val delayedRebalance = new DelayedJoin(this, group, rebalanceTimeout)
- val groupKey = GroupKey(group.groupId)
- joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
- }
-
- private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
- trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
- group.remove(member.memberId)
- group.currentState match {
- case Dead | Empty =>
- case Stable | AwaitingSync => maybePrepareRebalance(group)
- case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
- }
- }
-
- def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
- group synchronized {
- if (group.notYetRejoinedMembers.isEmpty)
- forceComplete()
- else false
- }
- }
-
- def onExpireJoin() {
- // TODO: add metrics for restabilize timeouts
- }
-
- def onCompleteJoin(group: GroupMetadata) {
- var delayedStore: Option[DelayedStore] = None
- group synchronized {
- // remove any members who haven't joined the group yet
- group.notYetRejoinedMembers.foreach { failedMember =>
- group.remove(failedMember.memberId)
- // TODO: cut the socket connection to the client
- }
-
- if (!group.is(Dead)) {
- group.initNextGeneration()
- if (group.is(Empty)) {
- info(s"Group ${group.groupId} with generation ${group.generationId} is now empty")
-
- delayedStore = groupManager.prepareStoreGroup(group, Map.empty, error => {
- if (error != Errors.NONE) {
- // we failed to write the empty group metadata. If the broker fails before another rebalance,
- // the previous generation written to the log will become active again (and most likely timeout).
- // This should be safe since there are no active members in an empty generation, so we just warn.
- warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
- }
- })
- } else {
- info(s"Stabilized group ${group.groupId} generation ${group.generationId}")
-
- // trigger the awaiting join group response callback for all the members after rebalancing
- for (member <- group.allMemberMetadata) {
- assert(member.awaitingJoinCallback != null)
- val joinResult = JoinGroupResult(
- members = if (member.memberId == group.leaderId) { group.currentMemberMetadata } else { Map.empty },
- memberId = member.memberId,
- generationId = group.generationId,
- subProtocol = group.protocol,
- leaderId = group.leaderId,
- error = Errors.NONE)
-
- member.awaitingJoinCallback(joinResult)
- member.awaitingJoinCallback = null
- completeAndScheduleNextHeartbeatExpiration(group, member)
- }
- }
- }
- }
-
- // call without holding the group lock
- delayedStore.foreach(groupManager.store)
- }
-
- def tryCompleteHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long, forceComplete: () => Boolean) = {
- group synchronized {
- if (shouldKeepMemberAlive(member, heartbeatDeadline) || member.isLeaving)
- forceComplete()
- else false
- }
- }
-
- def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
- group synchronized {
- if (!shouldKeepMemberAlive(member, heartbeatDeadline))
- onMemberFailure(group, member)
- }
- }
-
- def onCompleteHeartbeat() {
- // TODO: add metrics for complete heartbeats
- }
-
- def partitionFor(group: String): Int = groupManager.partitionFor(group)
-
- private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
- member.awaitingJoinCallback != null ||
- member.awaitingSyncCallback != null ||
- member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
-
- private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
-
- private def isCoordinatorLoadInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
-}
-
-object GroupCoordinator {
-
- val NoState = ""
- val NoProtocolType = ""
- val NoProtocol = ""
- val NoLeader = ""
- val NoMembers = List[MemberSummary]()
- val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers)
- val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers)
-
- def apply(config: KafkaConfig,
- zkUtils: ZkUtils,
- replicaManager: ReplicaManager,
- time: Time): GroupCoordinator = {
- val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
- val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
- apply(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, time)
- }
-
- private[coordinator] def offsetConfig(config: KafkaConfig) = OffsetConfig(
- maxMetadataSize = config.offsetMetadataMaxSize,
- loadBufferSize = config.offsetsLoadBufferSize,
- offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
- offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
- offsetsTopicNumPartitions = config.offsetsTopicPartitions,
- offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
- offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
- offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
- offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
- offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
- )
-
- def apply(config: KafkaConfig,
- zkUtils: ZkUtils,
- replicaManager: ReplicaManager,
- heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
- joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
- time: Time): GroupCoordinator = {
- val offsetConfig = this.offsetConfig(config)
- val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
- groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
-
- val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
- offsetConfig, replicaManager, zkUtils, time)
- new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
- }
-
-}
-
-case class GroupConfig(groupMinSessionTimeoutMs: Int,
- groupMaxSessionTimeoutMs: Int)
-
-case class JoinGroupResult(members: Map[String, Array[Byte]],
- memberId: String,
- generationId: Int,
- subProtocol: String,
- leaderId: String,
- error: Errors)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
deleted file mode 100644
index 4ea5bdd..0000000
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import collection.{Seq, mutable, immutable}
-
-import java.util.UUID
-
-import kafka.common.OffsetAndMetadata
-import kafka.utils.nonthreadsafe
-import org.apache.kafka.common.TopicPartition
-
-private[coordinator] sealed trait GroupState { def state: Byte }
-
-/**
- * Group is preparing to rebalance
- *
- * action: respond to heartbeats with REBALANCE_IN_PROGRESS
- * respond to sync group with REBALANCE_IN_PROGRESS
- * remove member on leave group request
- * park join group requests from new or existing members until all expected members have joined
- * allow offset commits from previous generation
- * allow offset fetch requests
- * transition: some members have joined by the timeout => AwaitingSync
- * all members have left the group => Empty
- * group is removed by partition emigration => Dead
- */
-private[coordinator] case object PreparingRebalance extends GroupState { val state: Byte = 1 }
-
-/**
- * Group is awaiting state assignment from the leader
- *
- * action: respond to heartbeats with REBALANCE_IN_PROGRESS
- * respond to offset commits with REBALANCE_IN_PROGRESS
- * park sync group requests from followers until transition to Stable
- * allow offset fetch requests
- * transition: sync group with state assignment received from leader => Stable
- * join group from new member or existing member with updated metadata => PreparingRebalance
- * leave group from existing member => PreparingRebalance
- * member failure detected => PreparingRebalance
- * group is removed by partition emigration => Dead
- */
-private[coordinator] case object AwaitingSync extends GroupState { val state: Byte = 5}
-
-/**
- * Group is stable
- *
- * action: respond to member heartbeats normally
- * respond to sync group from any member with current assignment
- * respond to join group from followers with matching metadata with current group metadata
- * allow offset commits from member of current generation
- * allow offset fetch requests
- * transition: member failure detected via heartbeat => PreparingRebalance
- * leave group from existing member => PreparingRebalance
- * leader join-group received => PreparingRebalance
- * follower join-group with new metadata => PreparingRebalance
- * group is removed by partition emigration => Dead
- */
-private[coordinator] case object Stable extends GroupState { val state: Byte = 3 }
-
-/**
- * Group has no more members and its metadata is being removed
- *
- * action: respond to join group with UNKNOWN_MEMBER_ID
- * respond to sync group with UNKNOWN_MEMBER_ID
- * respond to heartbeat with UNKNOWN_MEMBER_ID
- * respond to leave group with UNKNOWN_MEMBER_ID
- * respond to offset commit with UNKNOWN_MEMBER_ID
- * allow offset fetch requests
- * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions
- */
-private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
-
-/**
- * Group has no more members, but lingers until all offsets have expired. This state
- * also represents groups which use Kafka only for offset commits and have no members.
- *
- * action: respond normally to join group from new members
- * respond to sync group with UNKNOWN_MEMBER_ID
- * respond to heartbeat with UNKNOWN_MEMBER_ID
- * respond to leave group with UNKNOWN_MEMBER_ID
- * respond to offset commit with UNKNOWN_MEMBER_ID
- * allow offset fetch requests
- * transition: last offsets removed in periodic expiration task => Dead
- * join group from a new member => PreparingRebalance
- * group is removed by partition emigration => Dead
- * group is removed by expiration => Dead
- */
-private[coordinator] case object Empty extends GroupState { val state: Byte = 5 }
-
-
-private object GroupMetadata {
- private val validPreviousStates: Map[GroupState, Set[GroupState]] =
- Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead),
- AwaitingSync -> Set(PreparingRebalance),
- Stable -> Set(AwaitingSync),
- PreparingRebalance -> Set(Stable, AwaitingSync, Empty),
- Empty -> Set(PreparingRebalance))
-}
-
-/**
- * Case class used to represent group metadata for the ListGroups API
- */
-case class GroupOverview(groupId: String,
- protocolType: String)
-
-/**
- * Case class used to represent group metadata for the DescribeGroup API
- */
-case class GroupSummary(state: String,
- protocolType: String,
- protocol: String,
- members: List[MemberSummary])
-
-/**
- * Group contains the following metadata:
- *
- * Membership metadata:
- * 1. Members registered in this group
- * 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
- * 3. Protocol metadata associated with group members
- *
- * State metadata:
- * 1. group state
- * 2. generation id
- * 3. leader id
- */
-@nonthreadsafe
-private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
-
- private var state: GroupState = initialState
- private val members = new mutable.HashMap[String, MemberMetadata]
- private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
- private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
-
- var protocolType: Option[String] = None
- var generationId = 0
- var leaderId: String = null
- var protocol: String = null
-
- def is(groupState: GroupState) = state == groupState
- def not(groupState: GroupState) = state != groupState
- def has(memberId: String) = members.contains(memberId)
- def get(memberId: String) = members(memberId)
-
- def add(member: MemberMetadata) {
- if (members.isEmpty)
- this.protocolType = Some(member.protocolType)
-
- assert(groupId == member.groupId)
- assert(this.protocolType.orNull == member.protocolType)
- assert(supportsProtocols(member.protocols))
-
- if (leaderId == null)
- leaderId = member.memberId
- members.put(member.memberId, member)
- }
-
- def remove(memberId: String) {
- members.remove(memberId)
- if (memberId == leaderId) {
- leaderId = if (members.isEmpty) {
- null
- } else {
- members.keys.head
- }
- }
- }
-
- def currentState = state
-
- def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
-
- def allMembers = members.keySet
-
- def allMemberMetadata = members.values.toList
-
- def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>
- timeout.max(member.rebalanceTimeoutMs)
- }
-
- // TODO: decide if ids should be predictable or random
- def generateMemberIdSuffix = UUID.randomUUID().toString
-
- def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)
-
- def transitionTo(groupState: GroupState) {
- assertValidTransition(groupState)
- state = groupState
- }
-
- def selectProtocol: String = {
- if (members.isEmpty)
- throw new IllegalStateException("Cannot select protocol for empty group")
-
- // select the protocol for this group which is supported by all members
- val candidates = candidateProtocols
-
- // let each member vote for one of the protocols and choose the one with the most votes
- val votes: List[(String, Int)] = allMemberMetadata
- .map(_.vote(candidates))
- .groupBy(identity)
- .mapValues(_.size)
- .toList
-
- votes.maxBy(_._2)._1
- }
-
- private def candidateProtocols = {
- // get the set of protocols that are commonly supported by all members
- allMemberMetadata
- .map(_.protocols)
- .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
- }
-
- def supportsProtocols(memberProtocols: Set[String]) = {
- members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
- }
-
- def initNextGeneration() = {
- assert(notYetRejoinedMembers == List.empty[MemberMetadata])
- if (members.nonEmpty) {
- generationId += 1
- protocol = selectProtocol
- transitionTo(AwaitingSync)
- } else {
- generationId += 1
- protocol = null
- transitionTo(Empty)
- }
- }
-
- def currentMemberMetadata: Map[String, Array[Byte]] = {
- if (is(Dead) || is(PreparingRebalance))
- throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state))
- members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap
- }
-
- def summary: GroupSummary = {
- if (is(Stable)) {
- val members = this.members.values.map { member => member.summary(protocol) }.toList
- GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members)
- } else {
- val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList
- GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members)
- }
- }
-
- def overview: GroupOverview = {
- GroupOverview(groupId, protocolType.getOrElse(""))
- }
-
- def initializeOffsets(offsets: collection.Map[TopicPartition, OffsetAndMetadata]) {
- this.offsets ++= offsets
- }
-
- def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) {
- if (pendingOffsetCommits.contains(topicPartition))
- offsets.put(topicPartition, offset)
-
- pendingOffsetCommits.get(topicPartition) match {
- case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition)
- case _ =>
- }
- }
-
- def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = {
- pendingOffsetCommits.get(topicPartition) match {
- case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition)
- case _ =>
- }
- }
-
- def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]) {
- pendingOffsetCommits ++= offsets
- }
-
- def removeOffsets(topicPartitions: Seq[TopicPartition]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
- topicPartitions.flatMap { topicPartition =>
- pendingOffsetCommits.remove(topicPartition)
- val removedOffset = offsets.remove(topicPartition)
- removedOffset.map(topicPartition -> _)
- }.toMap
- }
-
- def removeExpiredOffsets(startMs: Long) = {
- val expiredOffsets = offsets.filter {
- case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
- }
- offsets --= expiredOffsets.keySet
- expiredOffsets.toMap
- }
-
- def allOffsets = offsets.toMap
-
- def offset(topicPartition: TopicPartition) = offsets.get(topicPartition)
-
- def numOffsets = offsets.size
-
- def hasOffsets = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
-
- private def assertValidTransition(targetState: GroupState) {
- if (!GroupMetadata.validPreviousStates(targetState).contains(state))
- throw new IllegalStateException("Group %s should be in the %s states before moving to %s state. Instead it is in %s state"
- .format(groupId, GroupMetadata.validPreviousStates(targetState).mkString(","), targetState, state))
- }
-
- override def toString = {
- "[%s,%s,%s,%s]".format(groupId, protocolType, currentState.toString, members)
- }
-}
-