You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2023/02/09 08:13:38 UTC

[kafka] branch trunk updated: KAFKA-14391; Add ConsumerGroupHeartbeat API (#12972)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3be7f7d611d KAFKA-14391; Add ConsumerGroupHeartbeat API (#12972)
3be7f7d611d is described below

commit 3be7f7d611d0786f2f98159d5c7492b0d94a2bb7
Author: David Jacot <dj...@confluent.io>
AuthorDate: Thu Feb 9 09:13:31 2023 +0100

    KAFKA-14391; Add ConsumerGroupHeartbeat API (#12972)
    
    This patch does a few things:
    1) It introduces a new flag to the request spec: `latestVersionUnstable`. It signifies that the last version of the API is considered unstable (or still in development). As such, the last API version is not exposed by the server unless specified otherwise with the new internal `unstable.api.versions.enable`. This allows us to commit new APIs which are still in development.
    3) It adds the ConsumerGroupHeartbeat API, part of KIP-848, and marks it as unreleased for now.
    4) It adds the new error codes required by the new ConsumerGroupHeartbeat API.
    
    Reviewers: Justine Olshan <jo...@confluent.io>, Jeff Kim <je...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../common/errors/FencedMemberEpochException.java  | 26 ++++++++
 .../errors/UnreleasedInstanceIdException.java      | 26 ++++++++
 .../errors/UnsupportedAssignorException.java       | 26 ++++++++
 .../org/apache/kafka/common/protocol/ApiKeys.java  | 36 ++++++++++-
 .../org/apache/kafka/common/protocol/Errors.java   |  8 ++-
 .../kafka/common/requests/AbstractRequest.java     |  2 +
 .../kafka/common/requests/AbstractResponse.java    |  2 +
 .../kafka/common/requests/ApiVersionsResponse.java | 68 ++++++++++++++++----
 .../requests/ConsumerGroupHeartbeatRequest.java    | 73 +++++++++++++++++++++
 .../requests/ConsumerGroupHeartbeatResponse.java   | 74 ++++++++++++++++++++++
 .../message/ConsumerGroupHeartbeatRequest.json     | 69 ++++++++++++++++++++
 .../message/ConsumerGroupHeartbeatResponse.json    | 70 ++++++++++++++++++++
 .../kafka/common/message/ApiMessageTypeTest.java   |  2 +-
 .../kafka/common/protocol/ProtoUtilsTest.java      |  1 +
 .../common/requests/ApiVersionsResponseTest.java   | 15 +++--
 .../kafka/common/requests/RequestResponseTest.java | 44 +++++++++++++
 .../scala/kafka/network/RequestConvertToJson.scala |  2 +
 .../main/scala/kafka/network/SocketServer.scala    |  4 +-
 .../scala/kafka/server/ApiVersionManager.scala     | 54 +++++++++-------
 .../main/scala/kafka/server/ControllerServer.scala |  5 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   | 12 +++-
 core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +++
 .../main/scala/kafka/tools/TestRaftServer.scala    |  2 +-
 .../kafka/admin/BrokerApiVersionsCommandTest.scala |  1 +
 .../unit/kafka/network/SocketServerTest.scala      |  2 +-
 .../server/AbstractApiVersionsRequestTest.scala    | 31 ++++++---
 .../unit/kafka/server/ApiVersionManagerTest.scala  | 47 +++++++++++---
 .../unit/kafka/server/ApiVersionsRequestTest.scala |  9 ++-
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 64 +++++++++++++++++++
 .../unit/kafka/server/ControllerApisTest.scala     |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 18 +++++-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  5 ++
 .../kafka/message/ApiMessageTypeGenerator.java     | 28 +++++++-
 .../java/org/apache/kafka/message/MessageSpec.java | 17 ++++-
 .../jmh/metadata/MetadataRequestBenchmark.java     |  2 +-
 35 files changed, 778 insertions(+), 79 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/errors/FencedMemberEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/FencedMemberEpochException.java
new file mode 100644
index 00000000000..82b61eedcf3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/FencedMemberEpochException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class FencedMemberEpochException extends ApiException {
+    public FencedMemberEpochException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnreleasedInstanceIdException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnreleasedInstanceIdException.java
new file mode 100644
index 00000000000..0802f54151a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnreleasedInstanceIdException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class UnreleasedInstanceIdException extends ApiException {
+    public UnreleasedInstanceIdException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedAssignorException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedAssignorException.java
new file mode 100644
index 00000000000..c3859ccbfce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedAssignorException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class UnsupportedAssignorException extends ApiException {
+    public UnsupportedAssignorException(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index f727bd18e5f..3ba8faaba29 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.RecordBatch;
@@ -27,6 +28,7 @@ import java.util.EnumMap;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -108,7 +110,8 @@ public enum ApiKeys {
     UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true),
     DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS),
     LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS),
-    ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true);
+    ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true),
+    CONSUMER_GROUP_HEARTBEAT(ApiMessageType.CONSUMER_GROUP_HEARTBEAT);
 
     private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
         new EnumMap<>(ApiMessageType.ListenerType.class);
@@ -193,7 +196,11 @@ public enum ApiKeys {
     }
 
     public short latestVersion() {
-        return messageType.highestSupportedVersion();
+        return messageType.highestSupportedVersion(true);
+    }
+
+    public short latestVersion(boolean enableUnstableLastVersion) {
+        return messageType.highestSupportedVersion(enableUnstableLastVersion);
     }
 
     public short oldestVersion() {
@@ -212,6 +219,30 @@ public enum ApiKeys {
         return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
     }
 
+    public boolean isVersionEnabled(short apiVersion, boolean enableUnstableLastVersion) {
+        // ApiVersions API is a particular case. The client always send the highest version
+        // that it supports and the server fails back to version 0 if it does not know it.
+        // Hence, we have to accept any versions here, even unsupported ones.
+        if (this == ApiKeys.API_VERSIONS) return true;
+
+        return apiVersion >= oldestVersion() && apiVersion <= latestVersion(enableUnstableLastVersion);
+    }
+
+    public Optional<ApiVersionsResponseData.ApiVersion> toApiVersion(boolean enableUnstableLastVersion) {
+        short oldestVersion = oldestVersion();
+        short latestVersion = latestVersion(enableUnstableLastVersion);
+
+        // API is entirely disabled if latestStableVersion is smaller than oldestVersion.
+        if (latestVersion >= oldestVersion) {
+            return Optional.of(new ApiVersionsResponseData.ApiVersion()
+               .setApiKey(messageType.apiKey())
+               .setMinVersion(oldestVersion)
+               .setMaxVersion(latestVersion));
+        } else {
+            return Optional.empty();
+        }
+    }
+
     public short requestHeaderVersion(short apiVersion) {
         return messageType.requestHeaderVersion(apiVersion);
     }
@@ -288,5 +319,4 @@ public enum ApiKeys {
             .collect(Collectors.toList());
         return EnumSet.copyOf(apis);
     }
-
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 6fd3aa41ca0..b5ea650b166 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -39,6 +39,7 @@ import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException;
 import org.apache.kafka.common.errors.FeatureUpdateFailedException;
 import org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.FencedLeaderEpochException;
+import org.apache.kafka.common.errors.FencedMemberEpochException;
 import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
 import org.apache.kafka.common.errors.FetchSessionTopicIdException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
@@ -122,7 +123,9 @@ import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.errors.UnreleasedInstanceIdException;
 import org.apache.kafka.common.errors.UnstableOffsetCommitException;
+import org.apache.kafka.common.errors.UnsupportedAssignorException;
 import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
@@ -372,7 +375,10 @@ public enum Errors {
     FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
     INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new),
     NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new),
-    OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new);
+    OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new),
+    FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group coordinator. The member must abandon all its partitions and rejoin.", FencedMemberEpochException::new),
+    UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another member in the consumer group. That member must leave first.", UnreleasedInstanceIdException::new),
+    UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 0d96d842d6d..1988347a974 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -303,6 +303,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
                 return ListTransactionsRequest.parse(buffer, apiVersion);
             case ALLOCATE_PRODUCER_IDS:
                 return AllocateProducerIdsRequest.parse(buffer, apiVersion);
+            case CONSUMER_GROUP_HEARTBEAT:
+                return ConsumerGroupHeartbeatRequest.parse(buffer, apiVersion);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 7e4425d3e79..4b55a6d582d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -247,6 +247,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
                 return ListTransactionsResponse.parse(responseBuffer, version);
             case ALLOCATE_PRODUCER_IDS:
                 return AllocateProducerIdsResponse.parse(responseBuffer, version);
+            case CONSUMER_GROUP_HEARTBEAT:
+                return ConsumerGroupHeartbeatResponse.parse(responseBuffer, version);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 17986fa1086..ac4562bf873 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -116,7 +116,23 @@ public class ApiVersionsResponse extends AbstractResponse {
         int throttleTimeMs,
         ApiMessageType.ListenerType listenerType
     ) {
-        return createApiVersionsResponse(throttleTimeMs, filterApis(RecordVersion.current(), listenerType), Features.emptySupportedFeatures());
+        return createApiVersionsResponse(
+            throttleTimeMs,
+            filterApis(RecordVersion.current(), listenerType, true),
+            Features.emptySupportedFeatures()
+        );
+    }
+
+    public static ApiVersionsResponse defaultApiVersionsResponse(
+        int throttleTimeMs,
+        ApiMessageType.ListenerType listenerType,
+        boolean enableUnstableLastVersion
+    ) {
+        return createApiVersionsResponse(
+            throttleTimeMs,
+            filterApis(RecordVersion.current(), listenerType, enableUnstableLastVersion),
+            Features.emptySupportedFeatures()
+        );
     }
 
     public static ApiVersionsResponse createApiVersionsResponse(
@@ -146,14 +162,23 @@ public class ApiVersionsResponse extends AbstractResponse {
         Map<String, Short> finalizedFeatures,
         long finalizedFeaturesEpoch,
         NodeApiVersions controllerApiVersions,
-        ListenerType listenerType
+        ListenerType listenerType,
+        boolean enableUnstableLastVersion
     ) {
         ApiVersionCollection apiKeys;
         if (controllerApiVersions != null) {
             apiKeys = intersectForwardableApis(
-                listenerType, minRecordVersion, controllerApiVersions.allSupportedApiVersions());
+                listenerType,
+                minRecordVersion,
+                controllerApiVersions.allSupportedApiVersions(),
+                enableUnstableLastVersion
+            );
         } else {
-            apiKeys = filterApis(minRecordVersion, listenerType);
+            apiKeys = filterApis(
+                minRecordVersion,
+                listenerType,
+                enableUnstableLastVersion
+            );
         }
 
         return createApiVersionsResponse(
@@ -187,20 +212,31 @@ public class ApiVersionsResponse extends AbstractResponse {
     public static ApiVersionCollection filterApis(
         RecordVersion minRecordVersion,
         ApiMessageType.ListenerType listenerType
+    ) {
+        return filterApis(minRecordVersion, listenerType, false);
+    }
+
+    public static ApiVersionCollection filterApis(
+        RecordVersion minRecordVersion,
+        ApiMessageType.ListenerType listenerType,
+        boolean enableUnstableLastVersion
     ) {
         ApiVersionCollection apiKeys = new ApiVersionCollection();
         for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
             if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
-                apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey));
+                apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(apiKeys::add);
             }
         }
         return apiKeys;
     }
 
-    public static ApiVersionCollection collectApis(Set<ApiKeys> apiKeys) {
+    public static ApiVersionCollection collectApis(
+        Set<ApiKeys> apiKeys,
+        boolean enableUnstableLastVersion
+    ) {
         ApiVersionCollection res = new ApiVersionCollection();
         for (ApiKeys apiKey : apiKeys) {
-            res.add(ApiVersionsResponse.toApiVersion(apiKey));
+            apiKey.toApiVersion(enableUnstableLastVersion).ifPresent(res::add);
         }
         return res;
     }
@@ -212,24 +248,32 @@ public class ApiVersionsResponse extends AbstractResponse {
      * @param listenerType the listener type which constrains the set of exposed APIs
      * @param minRecordVersion min inter broker magic
      * @param activeControllerApiVersions controller ApiVersions
+     * @param enableUnstableLastVersion whether unstable versions should be advertised or not
      * @return commonly agreed ApiVersion collection
      */
     public static ApiVersionCollection intersectForwardableApis(
         final ApiMessageType.ListenerType listenerType,
         final RecordVersion minRecordVersion,
-        final Map<ApiKeys, ApiVersion> activeControllerApiVersions
+        final Map<ApiKeys, ApiVersion> activeControllerApiVersions,
+        boolean enableUnstableLastVersion
     ) {
         ApiVersionCollection apiKeys = new ApiVersionCollection();
         for (ApiKeys apiKey : ApiKeys.apisForListener(listenerType)) {
             if (apiKey.minRequiredInterBrokerMagic <= minRecordVersion.value) {
-                ApiVersion brokerApiVersion = toApiVersion(apiKey);
+                final Optional<ApiVersion> brokerApiVersion = apiKey.toApiVersion(enableUnstableLastVersion);
+                if (!brokerApiVersion.isPresent()) {
+                    // Broker does not support this API key.
+                    continue;
+                }
 
                 final ApiVersion finalApiVersion;
                 if (!apiKey.forwardable) {
-                    finalApiVersion = brokerApiVersion;
+                    finalApiVersion = brokerApiVersion.get();
                 } else {
-                    Optional<ApiVersion> intersectVersion = intersect(brokerApiVersion,
-                        activeControllerApiVersions.getOrDefault(apiKey, null));
+                    Optional<ApiVersion> intersectVersion = intersect(
+                        brokerApiVersion.get(),
+                        activeControllerApiVersions.getOrDefault(apiKey, null)
+                    );
                     if (intersectVersion.isPresent()) {
                         finalApiVersion = intersectVersion.get();
                     } else {
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
new file mode 100644
index 00000000000..215e18ea2de
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ConsumerGroupHeartbeatRequest extends AbstractRequest {
+
+    public static class Builder extends AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
+        private final ConsumerGroupHeartbeatRequestData data;
+
+        public Builder(ConsumerGroupHeartbeatRequestData data) {
+            super(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
+            this.data = data;
+        }
+
+        @Override
+        public ConsumerGroupHeartbeatRequest build(short version) {
+            return new ConsumerGroupHeartbeatRequest(data, version);
+        }
+
+        @Override
+        public String toString() {
+            return data.toString();
+        }
+    }
+
+    private final ConsumerGroupHeartbeatRequestData data;
+
+    public ConsumerGroupHeartbeatRequest(ConsumerGroupHeartbeatRequestData data, short version) {
+        super(ApiKeys.CONSUMER_GROUP_HEARTBEAT, version);
+        this.data = data;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+        return new ConsumerGroupHeartbeatResponse(
+            new ConsumerGroupHeartbeatResponseData()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setErrorCode(Errors.forException(e).code())
+        );
+    }
+
+    @Override
+    public ConsumerGroupHeartbeatRequestData data() {
+        return data;
+    }
+
+    public static ConsumerGroupHeartbeatRequest parse(ByteBuffer buffer, short version) {
+        return new ConsumerGroupHeartbeatRequest(new ConsumerGroupHeartbeatRequestData(
+            new ByteBufferAccessor(buffer), version), version);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
new file mode 100644
index 00000000000..6f881cc8efa
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Possible error codes.
+ *
+ * - {@link Errors#GROUP_AUTHORIZATION_FAILED}
+ * - {@link Errors#NOT_COORDINATOR}
+ * - {@link Errors#COORDINATOR_NOT_AVAILABLE}
+ * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
+ * - {@link Errors#INVALID_REQUEST}
+ * - {@link Errors#UNKNOWN_MEMBER_ID}
+ * - {@link Errors#FENCED_MEMBER_EPOCH}
+ * - {@link Errors#UNSUPPORTED_ASSIGNOR}
+ * - {@link Errors#UNRELEASED_INSTANCE_ID}
+ * - {@link Errors#GROUP_MAX_SIZE_REACHED}
+ */
+public class ConsumerGroupHeartbeatResponse extends AbstractResponse {
+    private final ConsumerGroupHeartbeatResponseData data;
+
+    public ConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData data) {
+        super(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
+        this.data = data;
+    }
+
+    @Override
+    public ConsumerGroupHeartbeatResponseData data() {
+        return data;
+    }
+
+    @Override
+    public Map<Errors, Integer> errorCounts() {
+        return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
+    }
+
+    @Override
+    public int throttleTimeMs() {
+        return data.throttleTimeMs();
+    }
+
+    @Override
+    public void maybeSetThrottleTimeMs(int throttleTimeMs) {
+        data.setThrottleTimeMs(throttleTimeMs);
+    }
+
+    public static ConsumerGroupHeartbeatResponse parse(ByteBuffer buffer, short version) {
+        return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData(
+            new ByteBufferAccessor(buffer), version));
+    }
+}
diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
new file mode 100644
index 00000000000..c63996604e7
--- /dev/null
+++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "ConsumerGroupHeartbeatRequest",
+  // The ConsumerGroupHeartbeat API is added as part of KIP-848 and is still
+  // under developement. Hence, the API is not exposed by default by brokers
+  // unless explicitely enabled.
+  "latestVersionUnstable": true,
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
+      "about": "The group identifier." },
+    { "name": "MemberId", "type": "string", "versions": "0+",
+      "about": "The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will rejoin." },
+    { "name": "InstanceId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "null if not provided or if it didn't change since the last heartbeat; the instance Id otherwise." },
+    { "name": "RackId", "type": "string", "versions": "0+",  "nullableVersions": "0+", "default": "null",
+      "about": "null if not provided or if it didn't change since the last heartbeat; the rack ID of consumer otherwise." },
+    { "name": "RebalanceTimeoutMs", "type": "int32", "versions": "0+", "default": -1,
+      "about": "-1 if it didn't chance since the last heartbeat; the maximum time in milliseconds that the coordinator will wait on the member to revoke its partitions otherwise." },
+    { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", "nullableVersions": "0+", "default": "null", "entityType": "topicName",
+      "about": "null if it didn't change since the last heartbeat; the subscribed topic names otherwise." },
+    { "name": "SubscribedTopicRegex", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "null if it didn't change since the last heartbeat; the subscribed topic regex otherwise" },
+    { "name": "ServerAssignor", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "null if not used or if it didn't change since the last heartbeat; the server side assignor to use otherwise." },
+    { "name": "ClientAssignors", "type": "[]Assignor", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "null if not used or if it didn't change since the last heartbeat; the list of client-side assignors otherwise.", "fields": [
+      { "name": "Name", "type": "string", "versions": "0+",
+        "about": "The name of the assignor." },
+      { "name": "MinimumVersion", "type": "int16", "versions": "0+",
+        "about": "The minimum supported version for the metadata." },
+      { "name": "MaximumVersion", "type": "int16", "versions": "0+",
+        "about": "The maximum supported version for the metadata." },
+      { "name": "Reason", "type": "int8", "versions": "0+",
+        "about": "The reason of the metadata update." },
+      { "name": "MetadataVersion", "type": "int16", "versions": "0+",
+        "about": "The version of the metadata." },
+      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
+        "about": "The metadata." }
+    ]},
+    { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "null if it didn't change since the last heartbeat; the partitions owned by the member.", "fields": [
+      { "name": "TopicId", "type": "uuid", "versions": "0+",
+        "about": "The topic ID." },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "The partitions." }
+    ]}
+  ]
+}
diff --git a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
new file mode 100644
index 00000000000..b2e8992d7da
--- /dev/null
+++ b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED (version 0+)
+  // - NOT_COORDINATOR (version 0+)
+  // - COORDINATOR_NOT_AVAILABLE (version 0+)
+  // - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
+  // - INVALID_REQUEST (version 0+)
+  // - UNKNOWN_MEMBER_ID (version 0+)
+  // - FENCED_MEMBER_EPOCH (version 0+)
+  // - UNSUPPORTED_ASSIGNOR (version 0+)
+  // - UNRELEASED_INSTANCE_ID (version 0+)
+  // - GROUP_MAX_SIZE_REACHED (version 0+)
+  "fields": [
+    { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
+      "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
+    { "name": "ErrorCode", "type": "int16", "versions": "0+",
+      "about": "The top-level error code, or 0 if there was no error" },
+    { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "The top-level error message, or null if there was no error." },
+    { "name": "MemberId", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "The member id generated by the coordinator. Only provided when the member joins with MemberEpoch == 0." },
+    { "name": "MemberEpoch", "type": "int32", "versions": "0+",
+      "about": "The member epoch." },
+    { "name": "ShouldComputeAssignment", "type": "bool", "versions": "0+",
+      "about": "True if the member should compute the assignment for the group." },
+    { "name": "HeartbeatIntervalMs", "type": "int32", "versions": "0+",
+      "about": "The heartbeat interval in milliseconds." },
+    { "name": "Assignment", "type": "Assignment", "versions": "0+", "nullableVersions": "0+", "default": "null",
+      "about": "null if not provided; the assignment otherwise.", "fields": [
+      { "name": "Error", "type": "int8", "versions": "0+",
+        "about": "The assigned error." },
+      { "name": "AssignedTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
+        "about": "The partitions assigned to the member that can be used immediately." },
+      { "name": "PendingTopicPartitions", "type": "[]TopicPartitions", "versions": "0+",
+        "about": "The partitions assigned to the member that cannot be used because they are not released by their former owners yet." },
+      { "name": "MetadataVersion", "type": "int16", "versions": "0+",
+        "about": "The version of the metadata." },
+      { "name": "MetadataBytes", "type": "bytes", "versions": "0+",
+        "about": "The assigned metadata." }
+    ]}
+  ],
+  "commonStructs": [
+    { "name": "TopicPartitions", "versions": "0+", "fields": [
+      { "name": "TopicId", "type": "uuid", "versions": "0+",
+        "about": "The topic ID." },
+      { "name": "Partitions", "type": "[]int32", "versions": "0+",
+        "about": "The partitions." }
+    ]}
+  ]
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/message/ApiMessageTypeTest.java b/clients/src/test/java/org/apache/kafka/common/message/ApiMessageTypeTest.java
index 7dc61478d07..40da2701eac 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/ApiMessageTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/ApiMessageTypeTest.java
@@ -111,7 +111,7 @@ public class ApiMessageTypeTest {
             for (Schema schema : type.responseSchemas())
                 assertNotNull(schema);
 
-            assertEquals(type.highestSupportedVersion() + 1, type.requestSchemas().length);
+            assertEquals(type.highestSupportedVersion(true) + 1, type.requestSchemas().length);
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
index 712c61168f4..74413dc6549 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
@@ -35,6 +35,7 @@ public class ProtoUtilsTest {
                 case RENEW_DELEGATION_TOKEN:
                 case ALTER_USER_SCRAM_CREDENTIALS:
                 case ENVELOPE:
+                case CONSUMER_GROUP_HEARTBEAT:
                     assertTrue(key.requiresDelayedAllocation, key + " should require delayed allocation");
                     break;
                 default:
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index 2e498339256..b34304000ae 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -101,7 +101,8 @@ public class ApiVersionsResponseTest {
         ApiVersionCollection commonResponse = ApiVersionsResponse.intersectForwardableApis(
             ApiMessageType.ListenerType.ZK_BROKER,
             RecordVersion.current(),
-            activeControllerApiVersions
+            activeControllerApiVersions,
+            true
         );
 
         verifyVersions(forwardableAPIKey.id, minVersion, maxVersion, commonResponse);
@@ -119,7 +120,8 @@ public class ApiVersionsResponseTest {
             Collections.emptyMap(),
             ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
             null,
-            ListenerType.ZK_BROKER
+            ListenerType.ZK_BROKER,
+            true
         );
         verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
         assertEquals(10, response.throttleTimeMs());
@@ -138,7 +140,8 @@ public class ApiVersionsResponseTest {
             Utils.mkMap(Utils.mkEntry("feature", (short) 3)),
             10L,
             null,
-            ListenerType.ZK_BROKER
+            ListenerType.ZK_BROKER,
+            true
         );
 
         verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1);
@@ -165,7 +168,8 @@ public class ApiVersionsResponseTest {
             Collections.emptyMap(),
             ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
             null,
-            ListenerType.ZK_BROKER
+            ListenerType.ZK_BROKER,
+            true
         );
         assertEquals(new HashSet<>(ApiKeys.zkBrokerApis()), apiKeysInResponse(response));
         assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs());
@@ -183,7 +187,8 @@ public class ApiVersionsResponseTest {
             Collections.emptyMap(),
             ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH,
             null,
-            ListenerType.ZK_BROKER
+            ListenerType.ZK_BROKER,
+            true
         );
 
         // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index b4212f10cb1..a1b399a2d35 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -62,6 +62,8 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
 import org.apache.kafka.common.message.BrokerHeartbeatResponseData;
 import org.apache.kafka.common.message.BrokerRegistrationRequestData;
 import org.apache.kafka.common.message.BrokerRegistrationResponseData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
@@ -1047,6 +1049,7 @@ public class RequestResponseTest {
             case DESCRIBE_TRANSACTIONS: return createDescribeTransactionsRequest(version);
             case LIST_TRANSACTIONS: return createListTransactionsRequest(version);
             case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsRequest(version);
+            case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatRequest(version);
             default: throw new IllegalArgumentException("Unknown API key " + apikey);
         }
     }
@@ -1121,10 +1124,51 @@ public class RequestResponseTest {
             case DESCRIBE_TRANSACTIONS: return createDescribeTransactionsResponse();
             case LIST_TRANSACTIONS: return createListTransactionsResponse();
             case ALLOCATE_PRODUCER_IDS: return createAllocateProducerIdsResponse();
+            case CONSUMER_GROUP_HEARTBEAT: return createConsumerGroupHeartbeatResponse();
             default: throw new IllegalArgumentException("Unknown API key " + apikey);
         }
     }
 
+    private ConsumerGroupHeartbeatRequest createConsumerGroupHeartbeatRequest(short version) {
+        ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData()
+            .setGroupId("group")
+            .setMemberId("memberid")
+            .setMemberEpoch(10)
+            .setRebalanceTimeoutMs(60000)
+            .setServerAssignor("range")
+            .setRackId("rackid")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setTopicPartitions(Arrays.asList(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(Uuid.randomUuid())
+                    .setPartitions(Arrays.asList(0, 1, 2)),
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(Uuid.randomUuid())
+                    .setPartitions(Arrays.asList(3, 4, 5))
+            ));
+        return new ConsumerGroupHeartbeatRequest.Builder(data).build(version);
+    }
+
+    private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse() {
+        ConsumerGroupHeartbeatResponseData data = new ConsumerGroupHeartbeatResponseData()
+            .setErrorCode(Errors.NONE.code())
+            .setThrottleTimeMs(1000)
+            .setMemberId("memberid")
+            .setMemberEpoch(11)
+            .setShouldComputeAssignment(false)
+            .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment()
+                .setAssignedTopicPartitions(Arrays.asList(
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(Uuid.randomUuid())
+                        .setPartitions(Arrays.asList(0, 1, 2)),
+                    new ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                        .setTopicId(Uuid.randomUuid())
+                        .setPartitions(Arrays.asList(3, 4, 5))
+                ))
+            );
+        return new ConsumerGroupHeartbeatResponse(data);
+    }
+
     private FetchSnapshotRequest createFetchSnapshotRequest(short version) {
         FetchSnapshotRequestData data = new FetchSnapshotRequestData()
                 .setClusterId("clusterId")
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index 6a374e7afb2..771a86e8f0f 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -95,6 +95,7 @@ object RequestConvertToJson {
       case req: DescribeProducersRequest => DescribeProducersRequestDataJsonConverter.write(req.data, request.version)
       case req: DescribeTransactionsRequest => DescribeTransactionsRequestDataJsonConverter.write(req.data, request.version)
       case req: ListTransactionsRequest => ListTransactionsRequestDataJsonConverter.write(req.data, request.version)
+      case req: ConsumerGroupHeartbeatRequest => ConsumerGroupHeartbeatRequestDataJsonConverter.write(req.data, request.version)
       case _ => throw new IllegalStateException(s"ApiKey ${request.apiKey} is not currently handled in `request`, the " +
         "code should be updated to do so.");
     }
@@ -170,6 +171,7 @@ object RequestConvertToJson {
       case res: DescribeProducersResponse => DescribeProducersResponseDataJsonConverter.write(res.data, version)
       case res: DescribeTransactionsResponse => DescribeTransactionsResponseDataJsonConverter.write(res.data, version)
       case res: ListTransactionsResponse => ListTransactionsResponseDataJsonConverter.write(res.data, version)
+      case res: ConsumerGroupHeartbeatResponse => ConsumerGroupHeartbeatResponseDataJsonConverter.write(res.data, version)
       case _ => throw new IllegalStateException(s"ApiKey ${response.apiKey} is not currently handled in `response`, the " +
         "code should be updated to do so.");
     }
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 90769a376fb..dca6c108374 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -1108,10 +1108,10 @@ private[kafka] class Processor(
 
   protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
     val header = RequestHeader.parse(buffer)
-    if (apiVersionManager.isApiEnabled(header.apiKey)) {
+    if (apiVersionManager.isApiEnabled(header.apiKey, header.apiVersion)) {
       header
     } else {
-      throw new InvalidRequestException(s"Received request api key ${header.apiKey} which is not enabled")
+      throw new InvalidRequestException(s"Received request api key ${header.apiKey} with version ${header.apiVersion} which is not enabled")
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala b/core/src/main/scala/kafka/server/ApiVersionManager.scala
index 20dc043563b..92e6d25cecb 100644
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ b/core/src/main/scala/kafka/server/ApiVersionManager.scala
@@ -26,10 +26,13 @@ import org.apache.kafka.common.requests.ApiVersionsResponse
 import scala.jdk.CollectionConverters._
 
 trait ApiVersionManager {
+  def enableUnstableLastVersion: Boolean
   def listenerType: ListenerType
   def enabledApis: collection.Set[ApiKeys]
   def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse
-  def isApiEnabled(apiKey: ApiKeys): Boolean = enabledApis.contains(apiKey)
+  def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
+    apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
+  }
   def newRequestMetrics: RequestChannel.Metrics = new network.RequestChannel.Metrics(enabledApis)
 }
 
@@ -45,7 +48,8 @@ object ApiVersionManager {
       listenerType,
       forwardingManager,
       supportedFeatures,
-      metadataCache
+      metadataCache,
+      config.unstableApiVersionsEnabled
     )
   }
 }
@@ -53,14 +57,23 @@ object ApiVersionManager {
 class SimpleApiVersionManager(
   val listenerType: ListenerType,
   val enabledApis: collection.Set[ApiKeys],
-  brokerFeatures: Features[SupportedVersionRange]
+  brokerFeatures: Features[SupportedVersionRange],
+  val enableUnstableLastVersion: Boolean
 ) extends ApiVersionManager {
 
-  def this(listenerType: ListenerType) = {
-    this(listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures())
+  def this(
+    listenerType: ListenerType,
+    enableUnstableLastVersion: Boolean
+  ) = {
+    this(
+      listenerType,
+      ApiKeys.apisForListener(listenerType).asScala,
+      BrokerFeatures.defaultSupportedFeatures(),
+      enableUnstableLastVersion
+    )
   }
 
-  private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
+  private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
 
   override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = {
     ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures)
@@ -71,29 +84,26 @@ class DefaultApiVersionManager(
   val listenerType: ListenerType,
   forwardingManager: Option[ForwardingManager],
   features: BrokerFeatures,
-  metadataCache: MetadataCache
+  metadataCache: MetadataCache,
+  val enableUnstableLastVersion: Boolean
 ) extends ApiVersionManager {
 
+  val enabledApis = ApiKeys.apisForListener(listenerType).asScala
+
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
     val supportedFeatures = features.supportedFeatures
     val finalizedFeatures = metadataCache.features()
     val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions)
 
     ApiVersionsResponse.createApiVersionsResponse(
-        throttleTimeMs,
-        metadataCache.metadataVersion().highestSupportedRecordVersion,
-        supportedFeatures,
-        finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
-        finalizedFeatures.epoch,
-        controllerApiVersions.orNull,
-        listenerType)
-  }
-
-  override def enabledApis: collection.Set[ApiKeys] = {
-    ApiKeys.apisForListener(listenerType).asScala
-  }
-
-  override def isApiEnabled(apiKey: ApiKeys): Boolean = {
-    apiKey.inScope(listenerType)
+      throttleTimeMs,
+      metadataCache.metadataVersion().highestSupportedRecordVersion,
+      supportedFeatures,
+      finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava,
+      finalizedFeatures.epoch,
+      controllerApiVersions.orNull,
+      listenerType,
+      enableUnstableLastVersion
+    )
   }
 }
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 3a9321720ee..4f2ad837b83 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -170,7 +170,10 @@ class ControllerServer(
           }.toMap
       }
 
-      val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)
+      val apiVersionManager = new SimpleApiVersionManager(
+        ListenerType.CONTROLLER,
+        config.unstableApiVersionsEnabled
+      )
 
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
       credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2e86da4aa9..7b65bbe50d5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -168,10 +168,10 @@ class KafkaApis(val requestChannel: RequestChannel,
       trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
         s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
 
-      if (!apiVersionManager.isApiEnabled(request.header.apiKey)) {
+      if (!apiVersionManager.isApiEnabled(request.header.apiKey, request.header.apiVersion)) {
         // The socket server will reject APIs which are not exposed in this scope and close the connection
         // before handing them to the request handler, so this path should not be exercised in practice
-        throw new IllegalStateException(s"API ${request.header.apiKey} is not enabled")
+        throw new IllegalStateException(s"API ${request.header.apiKey} with version ${request.header.apiVersion} is not enabled")
       }
 
       request.header.apiKey match {
@@ -237,6 +237,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
         case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
         case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
+        case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
         case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
       }
     } catch {
@@ -3568,6 +3569,13 @@ class KafkaApis(val requestChannel: RequestChannel,
       )
   }
 
+  def handleConsumerGroupHeartbeat(request: RequestChannel.Request): CompletableFuture[Unit] = {
+    val consumerGroupHeartbeatRequest = request.body[ConsumerGroupHeartbeatRequest]
+    // KIP-848 is not implemented yet so return UNSUPPORTED_VERSION.
+    requestHelper.sendMaybeThrottle(request, consumerGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+    CompletableFuture.completedFuture[Unit](())
+  }
+
   private def updateRecordConversionStats(request: RequestChannel.Request,
                                           tp: TopicPartition,
                                           conversionStats: RecordConversionStats): Unit = {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 7725af31d20..a37868675fe 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -621,6 +621,9 @@ object KafkaConfig {
   val PasswordEncoderKeyLengthProp =  "password.encoder.key.length"
   val PasswordEncoderIterationsProp =  "password.encoder.iterations"
 
+  /** Internal Configurations **/
+  val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
+
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
   val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form <code>hostname:port</code> where host and port are the " +
@@ -1404,6 +1407,10 @@ object KafkaConfig {
       .define(RaftConfig.QUORUM_LINGER_MS_CONFIG, INT, Defaults.QuorumLingerMs, null, MEDIUM, RaftConfig.QUORUM_LINGER_MS_DOC)
       .define(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG, INT, Defaults.QuorumRequestTimeoutMs, null, MEDIUM, RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_DOC)
       .define(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG, INT, Defaults.QuorumRetryBackoffMs, null, LOW, RaftConfig.QUORUM_RETRY_BACKOFF_MS_DOC)
+
+      /** Internal Configurations **/
+      // This indicates whether unreleased APIs should be advertised by this broker.
+      .defineInternal(UnstableApiVersionsEnableProp, BOOLEAN, false, LOW)
   }
 
   /** ********* Remote Log Management Configuration *********/
@@ -1929,6 +1936,9 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
   val quorumRequestTimeoutMs = getInt(RaftConfig.QUORUM_REQUEST_TIMEOUT_MS_CONFIG)
   val quorumRetryBackoffMs = getInt(RaftConfig.QUORUM_RETRY_BACKOFF_MS_CONFIG)
 
+  /** Internal Configurations **/
+  val unstableApiVersionsEnabled = getBoolean(KafkaConfig.UnstableApiVersionsEnableProp)
+
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
     dynamicConfig.addReconfigurable(reconfigurable)
   }
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 0c31b4187a0..896ae2407e2 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -74,7 +74,7 @@ class TestRaftServer(
     tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
     credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
-    val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER)
+    val apiVersionManager = new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
     socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
 
     val metaProperties = MetaProperties(
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index a48a466b0b2..81a283c0fdf 100644
--- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -41,6 +41,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
       props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
       props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
       props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
+      props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
       props
     }).map(KafkaConfig.fromProps)
 
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 459bcbce5b8..cc345ee4bdc 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -77,7 +77,7 @@ class SocketServerTest {
   // Clean-up any metrics left around by previous tests
   TestUtils.clearYammerMetrics()
 
-  private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER)
+  private val apiVersionManager = new SimpleApiVersionManager(ListenerType.ZK_BROKER, true)
   val server = new SocketServer(config, metrics, Time.SYSTEM, credentialProvider, apiVersionManager)
   server.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)
   val sockets = new ArrayBuffer[Socket]
diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index 99d593ede65..02ce2485974 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -21,7 +21,7 @@ import kafka.test.ClusterInstance
 import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
-import org.apache.kafka.common.message.{ApiMessageType, ApiVersionsResponseData}
+import org.apache.kafka.common.message.ApiMessageType
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record.RecordVersion
@@ -68,31 +68,42 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
     } finally socket.close()
   }
 
-  def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = cluster.clientListener()): Unit = {
+  def validateApiVersionsResponse(
+    apiVersionsResponse: ApiVersionsResponse,
+    listenerName: ListenerName = cluster.clientListener(),
+    enableUnstableLastVersion: Boolean = false
+  ): Unit = {
     val expectedApis = if (!cluster.isKRaftTest) {
-      ApiKeys.zkBrokerApis()
+      ApiVersionsResponse.collectApis(
+        ApiKeys.apisForListener(ApiMessageType.ListenerType.ZK_BROKER),
+        enableUnstableLastVersion
+      )
     } else if (cluster.controllerListenerName().asScala.contains(listenerName)) {
-      ApiKeys.controllerApis()
+      ApiVersionsResponse.collectApis(
+        ApiKeys.apisForListener(ApiMessageType.ListenerType.CONTROLLER),
+        enableUnstableLastVersion
+      )
     } else {
       ApiVersionsResponse.intersectForwardableApis(
         ApiMessageType.ListenerType.BROKER,
         RecordVersion.current,
-        NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions()
+        NodeApiVersions.create(ApiKeys.controllerApis().asScala.map(ApiVersionsResponse.toApiVersion).asJava).allSupportedApiVersions(),
+        enableUnstableLastVersion
       )
     }
 
-    assertEquals(expectedApis.size(), apiVersionsResponse.data.apiKeys().size(),
+    assertEquals(expectedApis.size, apiVersionsResponse.data.apiKeys.size,
       "API keys in ApiVersionsResponse must match API keys supported by broker.")
 
     val defaultApiVersionsResponse = if (!cluster.isKRaftTest) {
-      ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.ZK_BROKER)
+      ApiVersionsResponse.defaultApiVersionsResponse(0, ListenerType.ZK_BROKER, enableUnstableLastVersion)
     } else if(cluster.controllerListenerName().asScala.contains(listenerName)) {
-      ApiVersionsResponse.defaultApiVersionsResponse(ListenerType.CONTROLLER)
+      ApiVersionsResponse.defaultApiVersionsResponse(0, ListenerType.CONTROLLER, enableUnstableLastVersion)
     } else {
-      ApiVersionsResponse.createApiVersionsResponse(0, expectedApis.asInstanceOf[ApiVersionsResponseData.ApiVersionCollection])
+      ApiVersionsResponse.createApiVersionsResponse(0, expectedApis)
     }
 
-    for (expectedApiVersion: ApiVersion <- defaultApiVersionsResponse.data.apiKeys().asScala) {
+    for (expectedApiVersion: ApiVersion <- defaultApiVersionsResponse.data.apiKeys.asScala) {
       val actualApiVersion = apiVersionsResponse.apiVersion(expectedApiVersion.apiKey)
       assertNotNull(actualApiVersion, s"API key ${expectedApiVersion.apiKey()} is supported by broker, but not received in ApiVersionsResponse.")
       assertEquals(expectedApiVersion.apiKey, actualApiVersion.apiKey, "API key must be supported by the broker.")
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index 5d94319ed35..bcc84443f16 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -40,10 +40,34 @@ class ApiVersionManagerTest {
       listenerType = apiScope,
       forwardingManager = None,
       features = brokerFeatures,
-      metadataCache = metadataCache
+      metadataCache = metadataCache,
+      enableUnstableLastVersion = true
     )
     assertEquals(ApiKeys.apisForListener(apiScope).asScala, versionManager.enabledApis)
-    assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall(versionManager.isApiEnabled))
+    assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall { apiKey =>
+      apiKey.allVersions.asScala.forall { version =>
+        versionManager.isApiEnabled(apiKey, version)
+      }
+    })
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[ListenerType])
+  def testDisabledApis(apiScope: ListenerType): Unit = {
+    val versionManager = new DefaultApiVersionManager(
+      listenerType = apiScope,
+      forwardingManager = None,
+      features = brokerFeatures,
+      metadataCache = metadataCache,
+      enableUnstableLastVersion = false
+    )
+
+    ApiKeys.apisForListener(apiScope).forEach { apiKey =>
+      if (apiKey.messageType.latestVersionUnstable()) {
+        assertFalse(versionManager.isApiEnabled(apiKey, apiKey.latestVersion),
+          s"$apiKey version ${apiKey.latestVersion} should be disabled.")
+      }
+    }
   }
 
   @Test
@@ -63,7 +87,8 @@ class ApiVersionManagerTest {
       listenerType = ListenerType.ZK_BROKER,
       forwardingManager = Some(forwardingManager),
       features = brokerFeatures,
-      metadataCache = metadataCache
+      metadataCache = metadataCache,
+      enableUnstableLastVersion = true
     )
 
     val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
@@ -83,9 +108,10 @@ class ApiVersionManagerTest {
         listenerType = ListenerType.BROKER,
         forwardingManager = forwardingManagerOpt,
         features = brokerFeatures,
-        metadataCache = metadataCache
+        metadataCache = metadataCache,
+        enableUnstableLastVersion = true
       )
-      assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
+      assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
       assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
 
       val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
@@ -104,9 +130,10 @@ class ApiVersionManagerTest {
       listenerType = ListenerType.ZK_BROKER,
       forwardingManager = Some(forwardingManager),
       features = brokerFeatures,
-      metadataCache = metadataCache
+      metadataCache = metadataCache,
+      enableUnstableLastVersion = true
     )
-    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
+    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
     assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
 
     val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
@@ -122,13 +149,13 @@ class ApiVersionManagerTest {
       listenerType = ListenerType.ZK_BROKER,
       forwardingManager = None,
       features = brokerFeatures,
-      metadataCache = metadataCache
+      metadataCache = metadataCache,
+      enableUnstableLastVersion = true
     )
-    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
+    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE, ApiKeys.ENVELOPE.latestVersion))
     assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
 
     val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0)
     assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
   }
-
 }
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index bc45b72077d..cea87138e73 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -21,7 +21,7 @@ import kafka.test.{ClusterConfig, ClusterInstance}
 import org.apache.kafka.common.message.ApiVersionsRequestData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.ApiVersionsRequest
-import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.BeforeEach
@@ -44,6 +44,13 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio
     validateApiVersionsResponse(apiVersionsResponse)
   }
 
+  @ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")))
+  def testApiVersionsRequestIncludesUnreleasedApis(): Unit = {
+    val request = new ApiVersionsRequest.Builder().build()
+    val apiVersionsResponse = sendApiVersionsRequest(request, cluster.clientListener())
+    validateApiVersionsResponse(apiVersionsResponse, enableUnstableLastVersion = true)
+  }
+
   @ClusterTest(clusterType = Type.ZK)
   def testApiVersionsRequestThroughControlPlaneListener(): Unit = {
     val request = new ApiVersionsRequest.Builder().build()
diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
new file mode 100644
index 00000000000..33e449f7531
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, ConsumerGroupHeartbeatResponseData}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
+import org.junit.jupiter.api.Tag
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.api.extension.ExtendWith
+
+import java.io.EOFException
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
+@Tag("integration")
+class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) {
+
+  @ClusterTest
+  def testConsumerGroupHeartbeatIsDisabledByDefault(): Unit = {
+    val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+    ).build()
+    assertThrows(classOf[EOFException], () => connectAndReceive(consumerGroupHeartbeatRequest))
+  }
+
+  @ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true")))
+  def testConsumerGroupHeartbeatIsAccessibleWhenEnabled(): Unit = {
+    val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+    ).build()
+
+    val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest)
+    val expectedResponse = new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+    assertEquals(expectedResponse, consumerGroupHeartbeatResponse.data)
+  }
+
+  private def connectAndReceive(request: ConsumerGroupHeartbeatRequest): ConsumerGroupHeartbeatResponse = {
+    IntegrationTestUtils.connectAndReceive[ConsumerGroupHeartbeatResponse](
+      request,
+      cluster.anyBrokerSocketServer(),
+      cluster.clientListener()
+    )
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 969d57c0dd2..c5919282802 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -109,7 +109,7 @@ class ControllerApisTest {
       new KafkaConfig(props),
       MetaProperties("JgxuGe9URy-E-ceaL04lEw", nodeId = nodeId),
       Seq.empty,
-      new SimpleApiVersionManager(ListenerType.CONTROLLER)
+      new SimpleApiVersionManager(ListenerType.CONTROLLER, true)
     )
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1df91216166..5c8bad2c233 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -182,7 +182,7 @@ class KafkaApisTest {
     } else {
       ApiKeys.apisForListener(listenerType).asScala.toSet
     }
-    val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures())
+    val apiVersionManager = new SimpleApiVersionManager(listenerType, enabledApis, BrokerFeatures.defaultSupportedFeatures(), true)
 
     new KafkaApis(
       requestChannel = requestChannel,
@@ -5743,4 +5743,20 @@ class KafkaApisTest {
     metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
     verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
   }
+
+  @Test
+  def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = {
+    val requestChannelRequest = buildRequest(
+      new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("group")
+      ).build()
+    )
+
+    createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val expectedHeartbeatResponse = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+    val response = verifyNoThrottling[ConsumerGroupHeartbeatResponse](requestChannelRequest)
+    assertEquals(expectedHeartbeatResponse, response.data)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 766861d0a34..4d638e138cd 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -81,6 +81,7 @@ class RequestQuotaTest extends BaseRequestTest {
     properties.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
     properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[RequestQuotaTest.TestAuthorizer].getName)
     properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[RequestQuotaTest.TestPrincipalBuilder].getName)
+    properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true")
   }
 
   @BeforeEach
@@ -643,6 +644,10 @@ class RequestQuotaTest extends BaseRequestTest {
 
         case ApiKeys.ALLOCATE_PRODUCER_IDS =>
           new AllocateProducerIdsRequest.Builder(new AllocateProducerIdsRequestData())
+
+        case ApiKeys.CONSUMER_GROUP_HEARTBEAT =>
+          new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData())
+
         case _ =>
           throw new IllegalArgumentException("Unsupported API key " + apiKey)
     }
diff --git a/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java b/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java
index 408e1a75ff2..b4fb674fef6 100644
--- a/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/ApiMessageTypeGenerator.java
@@ -154,10 +154,12 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("%n");
         generateAccessor("lowestSupportedVersion", "short");
         buffer.printf("%n");
-        generateAccessor("highestSupportedVersion", "short");
+        generateHighestSupportedVersion();
         buffer.printf("%n");
         generateAccessor("listeners", "EnumSet<ListenerType>");
         buffer.printf("%n");
+        generateAccessor("latestVersionUnstable", "boolean");
+        buffer.printf("%n");
         generateAccessor("apiKey", "short");
         buffer.printf("%n");
         generateAccessor("requestSchemas", "Schema[]");
@@ -210,7 +212,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
                     .collect(Collectors.toList());
             }
 
-            buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d, %s)%s%n",
+            buffer.printf("%s(\"%s\", (short) %d, %s, %s, (short) %d, (short) %d, %s, %s)%s%n",
                 MessageGenerator.toSnakeCase(name).toUpperCase(Locale.ROOT),
                 MessageGenerator.capitalizeFirst(name),
                 entry.getKey(),
@@ -219,6 +221,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
                 apiData.requestSpec.struct().versions().lowest(),
                 apiData.requestSpec.struct().versions().highest(),
                 generateListenerTypeEnumSet(listeners),
+                apiData.requestSpec.latestVersionUnstable(),
                 (numProcessed == apis.size()) ? ";" : ",");
         }
     }
@@ -231,6 +234,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("private final short lowestSupportedVersion;%n");
         buffer.printf("private final short highestSupportedVersion;%n");
         buffer.printf("private final EnumSet<ListenerType> listeners;%n");
+        buffer.printf("private final boolean latestVersionUnstable;%n");
         headerGenerator.addImport(MessageGenerator.SCHEMA_CLASS);
         headerGenerator.addImport(MessageGenerator.ENUM_SET_CLASS);
     }
@@ -239,7 +243,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("ApiMessageType(String name, short apiKey, " +
             "Schema[] requestSchemas, Schema[] responseSchemas, " +
             "short lowestSupportedVersion, short highestSupportedVersion, " +
-            "EnumSet<ListenerType> listeners) {%n");
+            "EnumSet<ListenerType> listeners, boolean latestVersionUnstable) {%n");
         buffer.incrementIndent();
         buffer.printf("this.name = name;%n");
         buffer.printf("this.apiKey = apiKey;%n");
@@ -248,6 +252,7 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("this.lowestSupportedVersion = lowestSupportedVersion;%n");
         buffer.printf("this.highestSupportedVersion = highestSupportedVersion;%n");
         buffer.printf("this.listeners = listeners;%n");
+        buffer.printf("this.latestVersionUnstable = latestVersionUnstable;%n");
         buffer.decrementIndent();
         buffer.printf("}%n");
     }
@@ -403,6 +408,23 @@ public final class ApiMessageTypeGenerator implements TypeClassGenerator {
         buffer.printf("}%n");
     }
 
+    private void generateHighestSupportedVersion() {
+        buffer.printf("public short highestSupportedVersion(boolean enableUnstableLastVersion) {%n");
+        buffer.incrementIndent();
+        buffer.printf("if (!this.latestVersionUnstable || enableUnstableLastVersion) {%n");
+        buffer.incrementIndent();
+        buffer.printf("return this.highestSupportedVersion;%n");
+        buffer.decrementIndent();
+        buffer.printf("} else {%n");
+        buffer.incrementIndent();
+        buffer.printf("// A negative value means that the API has no enabled versions.%n");
+        buffer.printf("return (short) (this.highestSupportedVersion - 1);%n");
+        buffer.decrementIndent();
+        buffer.printf("}%n");
+        buffer.decrementIndent();
+        buffer.printf("}%n");
+    }
+
     private void write(BufferedWriter writer) throws IOException {
         headerGenerator.buffer().write(writer);
         buffer.write(writer);
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
index 82866be78f5..337d8d44146 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageSpec.java
@@ -39,6 +39,8 @@ public final class MessageSpec {
 
     private final List<RequestListenerType> listeners;
 
+    private final boolean latestVersionUnstable;
+
     @JsonCreator
     public MessageSpec(@JsonProperty("name") String name,
                        @JsonProperty("validVersions") String validVersions,
@@ -47,7 +49,9 @@ public final class MessageSpec {
                        @JsonProperty("type") MessageSpecType type,
                        @JsonProperty("commonStructs") List<StructSpec> commonStructs,
                        @JsonProperty("flexibleVersions") String flexibleVersions,
-                       @JsonProperty("listeners") List<RequestListenerType> listeners) {
+                       @JsonProperty("listeners") List<RequestListenerType> listeners,
+                       @JsonProperty("latestVersionUnstable") boolean latestVersionUnstable
+    ) {
         this.struct = new StructSpec(name, validVersions, fields);
         this.apiKey = apiKey == null ? Optional.empty() : Optional.of(apiKey);
         this.type = Objects.requireNonNull(type);
@@ -70,6 +74,12 @@ public final class MessageSpec {
                 "messages with type `request`");
         }
         this.listeners = listeners;
+
+        if (latestVersionUnstable && type != MessageSpecType.REQUEST) {
+            throw new RuntimeException("The `latestVersionUnstable` property is only valid for " +
+                "messages with type `request`");
+        }
+        this.latestVersionUnstable = latestVersionUnstable;
     }
 
     public StructSpec struct() {
@@ -124,6 +134,11 @@ public final class MessageSpec {
         return listeners;
     }
 
+    @JsonProperty("latestVersionUnstable")
+    public boolean latestVersionUnstable() {
+        return latestVersionUnstable;
+    }
+
     public String dataClassName() {
         switch (type) {
             case HEADER:
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 5648ce2fb12..86c7ff01f41 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -199,7 +199,7 @@ public class MetadataRequestBenchmark {
             setClusterId("clusterId").
             setTime(Time.SYSTEM).
             setTokenManager(null).
-            setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER)).
+            setApiVersionManager(new SimpleApiVersionManager(ApiMessageType.ListenerType.ZK_BROKER, false)).
             build();
     }