You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/05/08 04:35:34 UTC

[kafka] branch trunk updated: KAFKA-8158: Add EntityType for Kafka RPC fields (#6503)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5144660  KAFKA-8158: Add EntityType for Kafka RPC fields (#6503)
5144660 is described below

commit 5144660040839cee6e213b5146c1ae44340eddb0
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Tue May 7 21:35:17 2019 -0700

    KAFKA-8158: Add EntityType for Kafka RPC fields (#6503)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../common/message/AddOffsetsToTxnRequest.json     |  6 +-
 .../common/message/AddPartitionsToTxnRequest.json  |  6 +-
 .../common/message/AddPartitionsToTxnResponse.json |  2 +-
 .../common/message/AlterReplicaLogDirsRequest.json |  2 +-
 .../message/AlterReplicaLogDirsResponse.json       |  2 +-
 .../common/message/ControlledShutdownRequest.json  |  2 +-
 .../common/message/ControlledShutdownResponse.json |  2 +-
 .../common/message/CreatePartitionsRequest.json    |  4 +-
 .../common/message/CreatePartitionsResponse.json   |  2 +-
 .../common/message/CreateTopicsRequest.json        |  4 +-
 .../common/message/CreateTopicsResponse.json       |  2 +-
 .../common/message/DeleteGroupsRequest.json        |  2 +-
 .../common/message/DeleteGroupsResponse.json       |  2 +-
 .../common/message/DeleteRecordsRequest.json       |  2 +-
 .../common/message/DeleteRecordsResponse.json      |  2 +-
 .../common/message/DeleteTopicsRequest.json        |  2 +-
 .../common/message/DeleteTopicsResponse.json       |  2 +-
 .../common/message/DescribeConfigsResponse.json    |  2 +-
 .../common/message/DescribeGroupsRequest.json      |  2 +-
 .../common/message/DescribeGroupsResponse.json     |  2 +-
 .../common/message/DescribeLogDirsRequest.json     |  2 +-
 .../common/message/DescribeLogDirsResponse.json    |  2 +-
 .../message/ElectPreferredLeadersRequest.json      |  4 +-
 .../message/ElectPreferredLeadersResponse.json     |  2 +-
 .../resources/common/message/EndTxnRequest.json    |  4 +-
 .../resources/common/message/FetchRequest.json     |  4 +-
 .../resources/common/message/FetchResponse.json    |  4 +-
 .../common/message/FindCoordinatorResponse.json    |  2 +-
 .../resources/common/message/HeartbeatRequest.json |  2 +-
 .../common/message/InitProducerIdRequest.json      |  2 +-
 .../common/message/InitProducerIdResponse.json     |  2 +-
 .../resources/common/message/JoinGroupRequest.json |  2 +-
 .../common/message/LeaderAndIsrRequest.json        | 12 +--
 .../common/message/LeaderAndIsrResponse.json       |  2 +-
 .../common/message/LeaveGroupRequest.json          |  2 +-
 .../common/message/ListGroupsResponse.json         |  2 +-
 .../common/message/ListOffsetRequest.json          |  4 +-
 .../common/message/ListOffsetResponse.json         |  2 +-
 .../resources/common/message/MetadataRequest.json  |  2 +-
 .../resources/common/message/MetadataResponse.json | 10 +--
 .../common/message/OffsetCommitRequest.json        |  4 +-
 .../common/message/OffsetCommitResponse.json       |  2 +-
 .../common/message/OffsetFetchRequest.json         |  5 +-
 .../common/message/OffsetFetchResponse.json        |  2 +-
 .../message/OffsetForLeaderEpochRequest.json       |  2 +-
 .../message/OffsetForLeaderEpochResponse.json      |  2 +-
 .../resources/common/message/ProduceRequest.json   |  4 +-
 .../resources/common/message/ProduceResponse.json  |  2 +-
 .../common/message/StopReplicaRequest.json         |  6 +-
 .../common/message/StopReplicaResponse.json        |  2 +-
 .../resources/common/message/SyncGroupRequest.json |  2 +-
 .../common/message/TxnOffsetCommitRequest.json     |  6 +-
 .../common/message/TxnOffsetCommitResponse.json    |  2 +-
 .../common/message/UpdateMetadataRequest.json      | 23 +++---
 .../common/message/WriteTxnMarkersRequest.json     |  4 +-
 .../common/message/WriteTxnMarkersResponse.json    |  4 +-
 .../java/org/apache/kafka/message/EntityType.java  | 62 +++++++++++++++
 .../java/org/apache/kafka/message/FieldSpec.java   |  6 ++
 .../java/org/apache/kafka/message/FieldType.java   | 14 ++--
 .../org/apache/kafka/message/EntityTypeTest.java   | 89 ++++++++++++++++++++++
 60 files changed, 261 insertions(+), 102 deletions(-)

diff --git a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
index 981650f..604a960 100644
--- a/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddOffsetsToTxnRequest.json
@@ -20,13 +20,13 @@
   // Version 1 is the same as version 0.
   "validVersions": "0-1",
   "fields": [
-    { "name": "TransactionalId", "type": "string", "versions": "0+",
+    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
       "about": "The transactional id corresponding to the transaction."},
-    { "name": "ProducerId", "type": "int64", "versions": "0+",
+    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
       "about": "Current producer id in use by the transactional id." },
     { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "Current epoch associated with the producer id." },
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The unique group identifier." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
index 1c71fa7..4d07b5b 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnRequest.json
@@ -20,15 +20,15 @@
   // Version 1 is the same as version 0.
   "validVersions": "0-1",
   "fields": [
-    { "name": "TransactionalId", "type": "string", "versions": "0+",
+    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
       "about": "The transactional id corresponding to the transaction."},
-    { "name": "ProducerId", "type": "int64", "versions": "0+",
+    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
       "about": "Current producer id in use by the transactional id." },
     { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "Current epoch associated with the producer id." },
     { "name": "Topics", "type": "[]AddPartitionsToTxnTopic", "versions": "0+",
       "about": "The partitions to add to the transation.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The name of the topic." },
       { "name": "Partitions", "type": "[]int32", "versions": "0+",
         "about": "The partition indexes to add to the transaction" }
diff --git a/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json b/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json
index 50ae5cd..ec69da4 100644
--- a/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json
+++ b/clients/src/main/resources/common/message/AddPartitionsToTxnResponse.json
@@ -24,7 +24,7 @@
       "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Results", "type": "[]AddPartitionsToTxnTopicResult", "versions": "0+",
       "about": "The results for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Results", "type": "[]AddPartitionsToTxnPartitionResult", "versions": "0+", 
         "about": "The results for each partition", "fields": [
diff --git a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
index 4a00249..826c3ca 100644
--- a/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/AlterReplicaLogDirsRequest.json
@@ -26,7 +26,7 @@
         "about": "The absolute directory path." },
       { "name": "Topics", "type": "[]AlterReplicaLogDirTopic", "versions": "0+",
         "about": "The topics to add to the directory.",  "fields": [
-        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+        { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
           "about": "The topic name." },
         { "name": "Partitions", "type": "[]int32", "versions": "0+",
           "about": "The partition indexes." }
diff --git a/clients/src/main/resources/common/message/AlterReplicaLogDirsResponse.json b/clients/src/main/resources/common/message/AlterReplicaLogDirsResponse.json
index 2551a15..047baee 100644
--- a/clients/src/main/resources/common/message/AlterReplicaLogDirsResponse.json
+++ b/clients/src/main/resources/common/message/AlterReplicaLogDirsResponse.json
@@ -24,7 +24,7 @@
       "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Results", "type": "[]AlterReplicaLogDirTopicResult", "versions": "0+",
       "about": "The results for each topic.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+",
+      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The name of the topic." },
       { "name": "Partitions", "type": "[]AlterReplicaLogDirPartitionResult", "versions": "0+",
         "about": "The results for each partition.", "fields": [
diff --git a/clients/src/main/resources/common/message/ControlledShutdownRequest.json b/clients/src/main/resources/common/message/ControlledShutdownRequest.json
index 60ceaa5..a6c3d9d 100644
--- a/clients/src/main/resources/common/message/ControlledShutdownRequest.json
+++ b/clients/src/main/resources/common/message/ControlledShutdownRequest.json
@@ -26,7 +26,7 @@
   // Version 2 adds BrokerEpoch.
   "validVersions": "0-2",
   "fields": [
-    { "name": "BrokerId", "type": "int32", "versions": "0+",
+    { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The id of the broker for which controlled shutdown has been requested." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "default": "-1", "ignorable": true,
       "about": "The broker epoch." }
diff --git a/clients/src/main/resources/common/message/ControlledShutdownResponse.json b/clients/src/main/resources/common/message/ControlledShutdownResponse.json
index d0fbcf2..53ee8f7 100644
--- a/clients/src/main/resources/common/message/ControlledShutdownResponse.json
+++ b/clients/src/main/resources/common/message/ControlledShutdownResponse.json
@@ -24,7 +24,7 @@
       "about": "The top-level error code." },
     { "name": "RemainingPartitions", "type": "[]RemainingPartition", "versions": "0+",
       "about": "The partitions that the broker still leads.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "TopicName", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The name of the topic." },
       { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
         "about": "The index of the partition." }
diff --git a/clients/src/main/resources/common/message/CreatePartitionsRequest.json b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
index 2dc75c7..360e6f2 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsRequest.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsRequest.json
@@ -22,13 +22,13 @@
   "fields": [
     { "name": "Topics", "type": "[]CreatePartitionsTopic", "versions": "0+",
       "about": "Each topic that we want to create new partitions inside.",  "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Count", "type": "int32", "versions": "0+",
         "about": "The new partition count." },
       { "name": "Assignments", "type": "[]CreatePartitionsAssignment", "versions": "0+", "nullableVersions": "0+", 
         "about": "The new partition assignments.", "fields": [
-        { "name": "BrokerIds", "type": "[]int32", "versions": "0+",
+        { "name": "BrokerIds", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
           "about": "The assigned broker IDs." }
       ]}
     ]},
diff --git a/clients/src/main/resources/common/message/CreatePartitionsResponse.json b/clients/src/main/resources/common/message/CreatePartitionsResponse.json
index 2a0c01e..2e79b1d 100644
--- a/clients/src/main/resources/common/message/CreatePartitionsResponse.json
+++ b/clients/src/main/resources/common/message/CreatePartitionsResponse.json
@@ -24,7 +24,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Results", "type": "[]CreatePartitionsTopicResult", "versions": "0+",
       "about": "The partition creation results for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The result error, or zero if there was no error."},
diff --git a/clients/src/main/resources/common/message/CreateTopicsRequest.json b/clients/src/main/resources/common/message/CreateTopicsRequest.json
index da6e6c2..842fb20 100644
--- a/clients/src/main/resources/common/message/CreateTopicsRequest.json
+++ b/clients/src/main/resources/common/message/CreateTopicsRequest.json
@@ -22,7 +22,7 @@
   "fields": [
     { "name": "Topics", "type": "[]CreatableTopic", "versions": "0+",
       "about": "The topics to create.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The topic name." },
       { "name": "NumPartitions", "type": "int32", "versions": "0+",
         "about": "The number of partitions to create in the topic, or -1 if we are specifying a manual partition assignment." },
@@ -32,7 +32,7 @@
         "about": "The manual partition assignment, or the empty array if we are using automatic assignment.", "fields": [
         { "name": "PartitionIndex", "type": "int32", "versions": "0+", "mapKey": true,
           "about": "The partition index." },
-        { "name": "BrokerIds", "type": "[]int32", "versions": "0+",
+        { "name": "BrokerIds", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
           "about": "The brokers to place the partition on." }
       ]},
       { "name": "Configs", "type": "[]CreateableTopicConfig", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/CreateTopicsResponse.json b/clients/src/main/resources/common/message/CreateTopicsResponse.json
index 49e4d7b..864e5fa 100644
--- a/clients/src/main/resources/common/message/CreateTopicsResponse.json
+++ b/clients/src/main/resources/common/message/CreateTopicsResponse.json
@@ -26,7 +26,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Topics", "type": "[]CreatableTopicResult", "versions": "0+",
       "about": "Results for each topic we tried to create.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The topic name." },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The error code, or 0 if there was no error." },
diff --git a/clients/src/main/resources/common/message/DeleteGroupsRequest.json b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
index 8dd8172..5671f70 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsRequest.json
@@ -20,7 +20,7 @@
   // Version 1 is the same as version 0.
   "validVersions": "0-1",
   "fields": [
-    { "name": "GroupsNames", "type": "[]string", "versions": "0+",
+    { "name": "GroupsNames", "type": "[]string", "versions": "0+", "entityType": "groupId",
       "about": "The group names to delete." }
   ]
 }
diff --git a/clients/src/main/resources/common/message/DeleteGroupsResponse.json b/clients/src/main/resources/common/message/DeleteGroupsResponse.json
index 818331b..693a124 100644
--- a/clients/src/main/resources/common/message/DeleteGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteGroupsResponse.json
@@ -24,7 +24,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Results", "type": "[]DeletableGroupResult", "versions": "0+",
       "about": "The deletion results", "fields": [
-      { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "GroupId", "type": "string", "versions": "0+", "mapKey": true, "entityType": "groupId",
         "about": "The group id" },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The deletion error, or 0 if the deletion succeeded." }
diff --git a/clients/src/main/resources/common/message/DeleteRecordsRequest.json b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
index be6c5e7..62e504f 100644
--- a/clients/src/main/resources/common/message/DeleteRecordsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteRecordsRequest.json
@@ -22,7 +22,7 @@
   "fields": [
     { "name": "Topics", "type": "[]DeleteRecordsTopic", "versions": "0+",
       "about": "Each topic that we want to delete records from.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]DeleteRecordsPartition", "versions": "0+",
         "about": "Each partition that we want to delete records from.", "fields": [
diff --git a/clients/src/main/resources/common/message/DeleteRecordsResponse.json b/clients/src/main/resources/common/message/DeleteRecordsResponse.json
index 88ac4ab..38de018 100644
--- a/clients/src/main/resources/common/message/DeleteRecordsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteRecordsResponse.json
@@ -24,7 +24,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Topics", "type": "[]DeleteRecordsTopicResult", "versions": "0+",
       "about": "Each topic that we wanted to delete records from.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]DeleteRecordsPartitionResult", "versions": "0+",
         "about": "Each partition that we wanted to delete records from.", "fields": [
diff --git a/clients/src/main/resources/common/message/DeleteTopicsRequest.json b/clients/src/main/resources/common/message/DeleteTopicsRequest.json
index 269a3c0..12b202f 100644
--- a/clients/src/main/resources/common/message/DeleteTopicsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteTopicsRequest.json
@@ -20,7 +20,7 @@
   // Versions 0, 1, 2, and 3 are the same.
   "validVersions": "0-3",
   "fields": [
-    { "name": "TopicNames", "type": "[]string", "versions": "0+",
+    { "name": "TopicNames", "type": "[]string", "versions": "0+", "entityType": "topicName",
       "about": "The names of the topics to delete" },
     { "name": "TimeoutMs", "type": "int32", "versions": "0+",
       "about": "The length of time in milliseconds to wait for the deletions to complete." }
diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json
index 4cea44b..0d30a41 100644
--- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json
@@ -26,7 +26,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+",
       "about": "The results for each topic we tried to delete.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The topic name" },
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The deletion error, or 0 if the deletion succeeded." }
diff --git a/clients/src/main/resources/common/message/DescribeConfigsResponse.json b/clients/src/main/resources/common/message/DescribeConfigsResponse.json
index 89cb145..f1dc184 100644
--- a/clients/src/main/resources/common/message/DescribeConfigsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeConfigsResponse.json
@@ -43,7 +43,7 @@
           "about": "True if the configuration is read-only." },
         { "name": "IsDefault", "type": "bool", "versions": "0",
           "about": "True if the configuration is not set." },
-        // Note: the v0 default for this field that shouldd be exposed to callers is
+        // Note: the v0 default for this field that should be exposed to callers is
         // context-dependent. For example, if the resource is a broker, this should default to 4.
         // -1 is just a placeholder value.
         { "name": "ConfigSource", "type": "int8", "versions": "1+", "default": "-1", "ignorable": true,
diff --git a/clients/src/main/resources/common/message/DescribeGroupsRequest.json b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
index 3557bae..b6f33ed 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsRequest.json
@@ -21,7 +21,7 @@
   // Starting in version 3, authorized operations can be requested.
   "validVersions": "0-3",
   "fields": [
-    { "name": "Groups", "type": "[]string", "versions": "0+",
+    { "name": "Groups", "type": "[]string", "versions": "0+", "entityType": "groupId",
       "about": "The names of the groups to describe" },
     { "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "3+",
       "about": "Whether to include authorized operations." }
diff --git a/clients/src/main/resources/common/message/DescribeGroupsResponse.json b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
index f4677a7..e0cbc1e 100644
--- a/clients/src/main/resources/common/message/DescribeGroupsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeGroupsResponse.json
@@ -28,7 +28,7 @@
       "about": "Each described group.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The describe error, or 0 if there was no error." },
-      { "name": "GroupId", "type": "string", "versions": "0+",
+      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
         "about": "The group ID string." },
       { "name": "GroupState", "type": "string", "versions": "0+",
         "about": "The group state string, or the empty string." },
diff --git a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
index b17cd4b..f602352 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsRequest.json
@@ -22,7 +22,7 @@
   "fields": [
     { "name": "Topics", "type": "[]DescribableLogDirTopic", "versions": "0+", "nullableVersions": "0+",
       "about": "Each topic that we want to describe log directories for, or null for all topics.", "fields": [
-      { "name": "Topic", "type": "string", "versions": "0+",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name" },
       { "name": "PartitionIndex", "type": "[]int32", "versions": "0+",
         "about": "The partition indxes." }
diff --git a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
index 85355d3..cd950a5 100644
--- a/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
+++ b/clients/src/main/resources/common/message/DescribeLogDirsResponse.json
@@ -30,7 +30,7 @@
         "about": "The absolute log directory path." },
       { "name": "Topics", "type": "[]DescribeLogDirsTopic", "versions": "0+",
         "about": "Each topic.", "fields": [
-        { "name": "Name", "type": "string", "versions": "0+",
+        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
           "about": "The topic name." },
         { "name": "Partitions", "type": "[]DescribeLogDirsPartition", "versions": "0+", "fields": [
           { "name": "PartitionIndex", "type": "int32", "versions": "0+",
diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json
index f566cdf..da2a423 100644
--- a/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json
+++ b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json
@@ -22,7 +22,7 @@
     { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+",
       "about": "The topic partitions to elect the preferred leader of.",
       "fields": [
-        { "name": "Topic", "type": "string", "versions": "0+",
+        { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
           "about": "The name of a topic." },
         { "name": "PartitionId", "type": "[]int32", "versions": "0+",
           "about": "The partitions of this topic whose preferred leader should be elected" }
@@ -30,4 +30,4 @@
     { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000",
       "about": "The time in ms to wait for the election to complete." }
   ]
-}
\ No newline at end of file
+}
diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json
index 491bd03..637b2c1 100644
--- a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json
+++ b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json
@@ -23,7 +23,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", "versions": "0+",
       "about": "The election results, or an empty array if the requester did not have permission and the request asks for all partitions.", "fields": [
-      { "name": "Topic", "type": "string", "versions": "0+",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name" },
       { "name": "PartitionResult", "type": "[]PartitionResult", "versions": "0+",
         "about": "The results for each partition", "fields": [
diff --git a/clients/src/main/resources/common/message/EndTxnRequest.json b/clients/src/main/resources/common/message/EndTxnRequest.json
index ebf1224..bf2db4c 100644
--- a/clients/src/main/resources/common/message/EndTxnRequest.json
+++ b/clients/src/main/resources/common/message/EndTxnRequest.json
@@ -20,9 +20,9 @@
   // Version 1 is the same as version 0.
   "validVersions": "0-1",
   "fields": [
-    { "name": "TransactionalId", "type": "string", "versions": "0+",
+    { "name": "TransactionalId", "type": "string", "versions": "0+", "entityType": "transactionalId",
       "about": "The ID of the transaction to end." },
-    { "name": "ProducerId", "type": "int64", "versions": "0+",
+    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
       "about": "The producer ID." },
     { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "The current epoch associated with the producer." },
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index ee1e88b..24c974d 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -62,7 +62,7 @@
       "about": "The fetch session ID." },
     { "name": "Topics", "type": "[]FetchableTopic", "versions": "0+",
       "about": "The topics to fetch.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The name of the topic to fetch." },
       { "name": "FetchPartitions", "type": "[]FetchPartition", "versions": "0+",
         "about": "The partitions to fetch.", "fields": [
@@ -80,7 +80,7 @@
     ]},
     { "name": "Forgotten", "type": "[]ForgottenTopic", "versions": "7+", "ignorable": false,
       "about": "In an incremental fetch request, the partitions to remove.", "fields": [
-      { "name": "Name", "type": "string", "versions": "7+",
+      { "name": "Name", "type": "string", "versions": "7+", "entityType": "topicName",
         "about": "The partition name." },
       { "name": "ForgottenPartitionIndexes", "type": "[]int32", "versions": "7+",
         "about": "The partitions indexes to forget." }
diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json
index afee391..5ebc97c 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -48,7 +48,7 @@
       "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
     { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
         "about": "The topic partitions.", "fields": [
@@ -64,7 +64,7 @@
           "about": "The current log start offset." },
         { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
           "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+",
+          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
             "about": "The producer id associated with the aborted transaction." },
           { "name": "FirstOffset", "type": "int64", "versions": "4+",
             "about": "The first offset in the aborted transaction." }
diff --git a/clients/src/main/resources/common/message/FindCoordinatorResponse.json b/clients/src/main/resources/common/message/FindCoordinatorResponse.json
index aed8f1a..b415a01 100644
--- a/clients/src/main/resources/common/message/FindCoordinatorResponse.json
+++ b/clients/src/main/resources/common/message/FindCoordinatorResponse.json
@@ -27,7 +27,7 @@
       "about": "The error code, or 0 if there was no error." },
     { "name": "ErrorMessage", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true,
       "about": "The error message, or null if there was no error." },
-    { "name": "NodeId", "type": "int32", "versions": "0+",
+    { "name": "NodeId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The node id." },
     { "name": "Host", "type": "string", "versions": "0+",
       "about": "The host name." },
diff --git a/clients/src/main/resources/common/message/HeartbeatRequest.json b/clients/src/main/resources/common/message/HeartbeatRequest.json
index 61cb20e..85c8528 100644
--- a/clients/src/main/resources/common/message/HeartbeatRequest.json
+++ b/clients/src/main/resources/common/message/HeartbeatRequest.json
@@ -20,7 +20,7 @@
   // Version 1 and version 2 are the same as version 0.
   "validVersions": "0-2",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The group id." },
     { "name": "Generationid", "type": "int32", "versions": "0+",
       "about": "The generation of the group." },
diff --git a/clients/src/main/resources/common/message/InitProducerIdRequest.json b/clients/src/main/resources/common/message/InitProducerIdRequest.json
index c8ca110..f6c5b35 100644
--- a/clients/src/main/resources/common/message/InitProducerIdRequest.json
+++ b/clients/src/main/resources/common/message/InitProducerIdRequest.json
@@ -20,7 +20,7 @@
   // Version 1 is the same as version 0.
   "validVersions": "0-1",
   "fields": [
-    { "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+",
+    { "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",
       "about": "The transactional id, or null if the producer is not transactional." },
     { "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
       "about": "The time in ms to wait for before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." }
diff --git a/clients/src/main/resources/common/message/InitProducerIdResponse.json b/clients/src/main/resources/common/message/InitProducerIdResponse.json
index a52fc81..ea8812a 100644
--- a/clients/src/main/resources/common/message/InitProducerIdResponse.json
+++ b/clients/src/main/resources/common/message/InitProducerIdResponse.json
@@ -24,7 +24,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "ProducerId", "type": "int64", "versions": "0+",
+    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
       "default": -1, "about": "The current producer id." },
     { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "The current epoch associated with the producer id." }
diff --git a/clients/src/main/resources/common/message/JoinGroupRequest.json b/clients/src/main/resources/common/message/JoinGroupRequest.json
index ec98d4a..6db24da 100644
--- a/clients/src/main/resources/common/message/JoinGroupRequest.json
+++ b/clients/src/main/resources/common/message/JoinGroupRequest.json
@@ -24,7 +24,7 @@
   // with assigned id.
   "validVersions": "0-5",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The group identifier." },
     { "name": "SessionTimeoutMs", "type": "int32", "versions": "0+",
       "about": "The coordinator considers the consumer dead if it receives no heartbeat after this timeout in milliseconds." },
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index b898835..dfbf6b5 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -22,7 +22,7 @@
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
   "validVersions": "0-2",
   "fields": [
-    { "name": "ControllerId", "type": "int32", "versions": "0+",
+    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The current controller ID." },
     { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
       "about": "The current controller epoch." },
@@ -30,7 +30,7 @@
       "about": "The current broker epoch." },
     { "name": "TopicStates", "type": "[]LeaderAndIsrRequestTopicState", "versions": "2+",
       "about": "Each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "2+",
+      { "name": "Name", "type": "string", "versions": "2+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionStates", "type": "[]LeaderAndIsrRequestPartitionState", "versions": "0+",
         "about": "The state of each partition", "fields": [
@@ -38,7 +38,7 @@
           "about": "The partition index." },
         { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
           "about": "The controller epoch." },
-        { "name": "LeaderKey", "type": "int32", "versions": "0+",
+        { "name": "LeaderKey", "type": "int32", "versions": "0+", "entityType": "brokerId",
           "about": "The broker ID of the leader." },
         { "name": "LeaderEpoch", "type": "int32", "versions": "0+",
           "about": "The leader epoch." },
@@ -54,13 +54,13 @@
     ]},
     { "name": "PartitionStatesV0", "type": "[]LeaderAndIsrRequestPartitionStateV0", "versions": "0-1",
       "about": "The state of each partition", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0-1",
+      { "name": "TopicName", "type": "string", "versions": "0-1", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionIndex", "type": "int32", "versions": "0-1",
         "about": "The partition index." },
       { "name": "ControllerEpoch", "type": "int32", "versions": "0-1",
         "about": "The controller epoch." },
-      { "name": "LeaderKey", "type": "int32", "versions": "0-1",
+      { "name": "LeaderKey", "type": "int32", "versions": "0-1", "entityType": "brokerId",
         "about": "The broker ID of the leader." },
       { "name": "LeaderEpoch", "type": "int32", "versions": "0-1",
         "about": "The leader epoch." },
@@ -75,7 +75,7 @@
     ]},
     { "name": "LiveLeaders", "type": "[]LeaderAndIsrLiveLeader", "versions": "0+",
       "about": "The current live leaders.", "fields": [
-      { "name": "BrokerId", "type": "int32", "versions": "0+",
+      { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
         "about": "The leader's broker ID." },
       { "name": "HostName", "type": "string", "versions": "0+",
         "about": "The leader's hostname." },
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
index e4e1e09..8f4bf63 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrResponse.json
@@ -26,7 +26,7 @@
       "about": "The error code, or 0 if there was no error." },
     { "name": "Partitions", "type": "[]LeaderAndIsrResponsePartition", "versions": "0+",
       "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+",
+      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionIndex", "type": "int32", "versions": "0+",
         "about": "The partition index." },
diff --git a/clients/src/main/resources/common/message/LeaveGroupRequest.json b/clients/src/main/resources/common/message/LeaveGroupRequest.json
index 9448705..7c536da 100644
--- a/clients/src/main/resources/common/message/LeaveGroupRequest.json
+++ b/clients/src/main/resources/common/message/LeaveGroupRequest.json
@@ -20,7 +20,7 @@
   // Version 1 and 2 are the same as version 0.
   "validVersions": "0-2",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The ID of the group to leave." },
     { "name": "MemberId", "type": "string", "versions": "0+",
       "about": "The member ID to remove from the group." }
diff --git a/clients/src/main/resources/common/message/ListGroupsResponse.json b/clients/src/main/resources/common/message/ListGroupsResponse.json
index 2dc83fa..502ed34 100644
--- a/clients/src/main/resources/common/message/ListGroupsResponse.json
+++ b/clients/src/main/resources/common/message/ListGroupsResponse.json
@@ -27,7 +27,7 @@
       "about": "The error code, or 0 if there was no error." },
     { "name": "Groups", "type": "[]ListedGroup", "versions": "0+",
       "about": "Each group in the response.", "fields": [
-      { "name": "GroupId", "type": "string", "versions": "0+",
+      { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
         "about": "The group ID." },
       { "name": "ProtocolType", "type": "string", "versions": "0+",
         "about": "The group protocol type." }
diff --git a/clients/src/main/resources/common/message/ListOffsetRequest.json b/clients/src/main/resources/common/message/ListOffsetRequest.json
index 8f2c738..194f6da 100644
--- a/clients/src/main/resources/common/message/ListOffsetRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetRequest.json
@@ -25,13 +25,13 @@
   // Version 5 is the same as version 5.
   "validVersions": "0-5",
   "fields": [
-    { "name": "ReplicaId", "type": "int32", "versions": "0+",
+    { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The broker ID of the requestor, or -1 if this request is being made by a normal consumer." },
     { "name": "IsolationLevel", "type": "int8", "versions": "2+",
       "about": "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED transactional records are visible. To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the result, which allows consumers to discard ABO [...]
     { "name": "Topics", "type": "[]ListOffsetTopic", "versions": "0+", 
       "about": "Each topic in the request.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]ListOffsetPartition", "versions": "0+",
         "about": "Each partition in the request.", "fields": [
diff --git a/clients/src/main/resources/common/message/ListOffsetResponse.json b/clients/src/main/resources/common/message/ListOffsetResponse.json
index 9476a19..eb691e3 100644
--- a/clients/src/main/resources/common/message/ListOffsetResponse.json
+++ b/clients/src/main/resources/common/message/ListOffsetResponse.json
@@ -29,7 +29,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Topics", "type": "[]ListOffsetTopicResponse", "versions": "0+",
       "about": "Each topic in the response.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name" },
       { "name": "Partitions", "type": "[]ListOffsetPartitionResponse", "versions": "0+",
         "about": "Each partition in the response.", "fields": [
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json
index 8848ac1..cbb3489 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -29,7 +29,7 @@
     // Starting in version 8, authorized operations can be requested for cluster and topic resource.
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." }
     ]},
     { "name": "AllowAutoTopicCreation", "type": "bool", "versions": "4+", "default": "true", "ignorable": false,
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json
index 2d248ab..f54ef28 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -39,7 +39,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Brokers", "type": "[]MetadataResponseBroker", "versions": "0+",
       "about": "Each broker in the response.", "fields": [
-      { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true,
+      { "name": "NodeId", "type": "int32", "versions": "0+", "mapKey": true, "entityType": "brokerId",
         "about": "The broker ID." },
       { "name": "Host", "type": "string", "versions": "0+",
         "about": "The broker hostname." },
@@ -50,13 +50,13 @@
     ]},
     { "name": "ClusterId", "type": "string", "nullableVersions": "2+", "versions": "2+", "ignorable": true, "default": "null",
       "about": "The cluster ID that responding broker belongs to." },
-    { "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
+    { "name": "ControllerId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, "entityType": "brokerId",
       "about": "The ID of the controller broker." },
     { "name": "Topics", "type": "[]MetadataResponseTopic", "versions": "0+",
       "about": "Each topic in the response.", "fields": [
       { "name": "ErrorCode", "type": "int16", "versions": "0+",
         "about": "The topic error, or 0 if there was no error." },
-      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true,
+      { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
         "about": "The topic name." },
       { "name": "IsInternal", "type": "bool", "versions": "1+", "default": "false", "ignorable": true,
         "about": "True if the topic is internal." },
@@ -66,11 +66,11 @@
           "about": "The partition error, or 0 if there was no error." },
         { "name": "PartitionIndex", "type": "int32", "versions": "0+",
           "about": "The partition index." },
-        { "name": "LeaderId", "type": "int32", "versions": "0+",
+        { "name": "LeaderId", "type": "int32", "versions": "0+", "entityType": "brokerId",
           "about": "The ID of the leader broker." },
         { "name": "LeaderEpoch", "type": "int32", "versions": "7+", "default": "-1", "ignorable": true,
           "about": "The leader epoch of this partition." },
-        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+",
+        { "name": "ReplicaNodes", "type": "[]int32", "versions": "0+", "entityType": "brokerId",
           "about": "The set of all nodes that host this partition." },
         { "name": "IsrNodes", "type": "[]int32", "versions": "0+",
           "about": "The set of nodes that are in sync with the leader for this partition." },
diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json
index ebccede..5d94a7b 100644
--- a/clients/src/main/resources/common/message/OffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json
@@ -28,7 +28,7 @@
   // Version 6 adds the leader epoch for fencing.
   "validVersions": "0-6",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The unique group identifier." },
     { "name": "GenerationId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
       "about": "The generation of the group." },
@@ -38,7 +38,7 @@
       "about": "The time period in ms to retain the offset." },
     { "name": "Topics", "type": "[]OffsetCommitRequestTopic", "versions": "0+",
       "about": "The topics to commit offsets for.",  "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]OffsetCommitRequestPartition", "versions": "0+",
         "about": "Each partition to commit offsets for.", "fields": [
diff --git a/clients/src/main/resources/common/message/OffsetCommitResponse.json b/clients/src/main/resources/common/message/OffsetCommitResponse.json
index 39daa56..d23fd13 100644
--- a/clients/src/main/resources/common/message/OffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json
@@ -30,7 +30,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Topics", "type": "[]OffsetCommitResponseTopic", "versions": "0+",
       "about": "The responses for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]OffsetCommitResponsePartition", "versions": "0+",
         "about": "The responses for each partition in the topic.",  "fields": [
diff --git a/clients/src/main/resources/common/message/OffsetFetchRequest.json b/clients/src/main/resources/common/message/OffsetFetchRequest.json
index e634f7c..4ff781b 100644
--- a/clients/src/main/resources/common/message/OffsetFetchRequest.json
+++ b/clients/src/main/resources/common/message/OffsetFetchRequest.json
@@ -25,11 +25,12 @@
   // Version 3, 4, and 5 are the same as version 2.
   "validVersions": "0-5",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The group to fetch offsets for." },
     { "name": "Topics", "type": "[]OffsetFetchRequestTopic", "versions": "0+", "nullableVersions": "2+",
       "about": "Each topic we would like to fetch offsets for, or null to fetch offsets for all topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+" },
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+        "about": "The topic name."},
       { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
         "about": "The partition indexes we would like to fetch offsets for." }
     ]}
diff --git a/clients/src/main/resources/common/message/OffsetFetchResponse.json b/clients/src/main/resources/common/message/OffsetFetchResponse.json
index 70fd277..eb0bbbc 100644
--- a/clients/src/main/resources/common/message/OffsetFetchResponse.json
+++ b/clients/src/main/resources/common/message/OffsetFetchResponse.json
@@ -32,7 +32,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Topics", "type": "[]OffsetFetchResponseTopic", "versions": "0+", 
       "about": "The responses per topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]OffsetFetchResponsePartition", "versions": "0+",
         "about": "The responses per partition", "fields": [
diff --git a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
index 40227ed..4104938 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
@@ -24,7 +24,7 @@
   "fields": [
     { "name": "Topics", "type": "[]OffsetForLeaderTopic", "versions": "0+",
       "about": "Each topic to get offsets for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]OffsetForLeaderPartition", "versions": "0+",
         "about": "Each partition to get offsets for.", "fields": [
diff --git a/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json b/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
index 26bd490..8e93422 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
@@ -25,7 +25,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Topics", "type": "[]OffsetForLeaderTopicResult", "versions": "0+",
       "about": "Each topic we fetched offsets for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]OffsetForLeaderPartitionResult", "versions": "0+",
         "about": "Each partition in the topic we fetched offsets for.", "fields": [
diff --git a/clients/src/main/resources/common/message/ProduceRequest.json b/clients/src/main/resources/common/message/ProduceRequest.json
index 4f35db2..2da4ed7 100644
--- a/clients/src/main/resources/common/message/ProduceRequest.json
+++ b/clients/src/main/resources/common/message/ProduceRequest.json
@@ -30,7 +30,7 @@
   // Starting in version 7, records can be produced using ZStandard compression.  See KIP-110.
   "validVersions": "0-7",
   "fields": [
-    { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+",
+    { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId",
       "about": "The transactional ID, or null if the producer is not transactional." },
     { "name": "Acks", "type": "int16", "versions": "0+",
       "about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." },
@@ -38,7 +38,7 @@
       "about": "The timeout to await a response in miliseconds." },
     { "name": "Topics", "type": "[]TopicProduceData", "versions": "0+", 
       "about": "Each topic to produce to.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]PartitionProduceData", "versions": "0+",
         "about": "Each partition to produce to.", "fields": [
diff --git a/clients/src/main/resources/common/message/ProduceResponse.json b/clients/src/main/resources/common/message/ProduceResponse.json
index 14d38aa..0659b4c 100644
--- a/clients/src/main/resources/common/message/ProduceResponse.json
+++ b/clients/src/main/resources/common/message/ProduceResponse.json
@@ -31,7 +31,7 @@
   "fields": [
     { "name": "Responses", "type": "[]TopicProduceResponse", "versions": "0+",
       "about": "Each produce response", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name" },
       { "name": "Partitions", "type": "[]PartitionProduceResponse", "versions": "0+",
         "about": "Each partition that we produced to within the topic.", "fields": [
diff --git a/clients/src/main/resources/common/message/StopReplicaRequest.json b/clients/src/main/resources/common/message/StopReplicaRequest.json
index dffa11d..12c3d7e 100644
--- a/clients/src/main/resources/common/message/StopReplicaRequest.json
+++ b/clients/src/main/resources/common/message/StopReplicaRequest.json
@@ -21,7 +21,7 @@
   // per topic.
   "validVersions": "0-1",
   "fields": [
-    { "name": "ControllerId", "type": "int32", "versions": "0+",
+    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The controller id." },
     { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
       "about": "The controller epoch." },
@@ -31,14 +31,14 @@
       "about": "Whether these partitions should be deleted." },
     { "name": "PartitionsV0", "type": "[]StopReplicaRequestPartitionV0", "versions": "0",
       "about": "The partitions to stop.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0",
+      { "name": "TopicName", "type": "string", "versions": "0", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionIndex", "type": "int32", "versions": "0",
         "about": "The partition index." }
     ]},
     { "name": "Topics", "type": "[]StopReplicaRequestTopic", "versions": "1+",
       "about": "The topics to stop.", "fields": [
-      { "name": "Name", "type": "string", "versions": "1+",
+      { "name": "Name", "type": "string", "versions": "1+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionIndexes", "type": "[]int32", "versions": "1+",
         "about": "The partition indexes." }
diff --git a/clients/src/main/resources/common/message/StopReplicaResponse.json b/clients/src/main/resources/common/message/StopReplicaResponse.json
index 55daac5..962cacb 100644
--- a/clients/src/main/resources/common/message/StopReplicaResponse.json
+++ b/clients/src/main/resources/common/message/StopReplicaResponse.json
@@ -24,7 +24,7 @@
       "about": "The top-level error code, or 0 if there was no top-level error." },
     { "name": "Partitions", "type": "[]StopReplicaResponsePartition", "versions": "0+",
       "about": "The responses for each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+",
+      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionIndex", "type": "int32", "versions": "0+",
         "about": "The partition index." },
diff --git a/clients/src/main/resources/common/message/SyncGroupRequest.json b/clients/src/main/resources/common/message/SyncGroupRequest.json
index ec910a0..00249b6 100644
--- a/clients/src/main/resources/common/message/SyncGroupRequest.json
+++ b/clients/src/main/resources/common/message/SyncGroupRequest.json
@@ -20,7 +20,7 @@
   // Versions 1 and 2 are the same as version 0.
   "validVersions": "0-2",
   "fields": [
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The unique group identifier." },
     { "name": "GenerationId", "type": "int32", "versions": "0+",
       "about": "The generation of the group." },
diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
index 357d1d0..a7b7d05 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitRequest.json
@@ -24,15 +24,15 @@
   "fields": [
     { "name": "TransactionalId", "type": "string", "versions": "0+",
       "about": "The ID of the transaction." },
-    { "name": "GroupId", "type": "string", "versions": "0+",
+    { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
       "about": "The ID of the group." },
-    { "name": "ProducerId", "type": "int64", "versions": "0+",
+    { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
       "about": "The current producer ID in use by the transactional ID." },
     { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
       "about": "The current epoch associated with the producer ID." },
     { "name": "Topics", "type" : "[]TxnOffsetCommitRequestTopic", "versions": "0+",
       "about": "Each topic that we want to committ offsets for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]TxnOffsetCommitRequestPartition", "versions": "0+",
         "about": "The partitions inside the topic that we want to committ offsets for.", "fields": [
diff --git a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
index 58667cb..4a13800 100644
--- a/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
+++ b/clients/src/main/resources/common/message/TxnOffsetCommitResponse.json
@@ -25,7 +25,7 @@
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
     { "name": "Topics", "type": "[]TxnOffsetCommitResponseTopic", "versions": "0+",
       "about": "The responses for each topic.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+",
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "Partitions", "type": "[]TxnOffsetCommitResponsePartition", "versions": "0+",
         "about": "The responses for each partition in the topic.", "fields": [
diff --git a/clients/src/main/resources/common/message/UpdateMetadataRequest.json b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
index 07e0f03..f23e2de 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
@@ -28,7 +28,7 @@
   // Version 5 adds the broker epoch field and normalizes partitions by topic.
   "validVersions": "0-5",
   "fields": [
-    { "name": "ControllerId", "type": "int32", "versions": "0+",
+    { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
       "about": "The controller id." },
     { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
       "about": "The controller epoch." },
@@ -36,7 +36,7 @@
       "about": "The broker epoch." },
     { "name": "TopicStates", "type": "[]UpdateMetadataRequestTopicState", "versions": "5+",
       "about": "Each topic that we would like to update.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+",
+      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionStates", "type": "[]UpdateMetadataPartitionState", "versions": "5+",
         "about": "The partition that we would like to update.", "fields": [
@@ -44,15 +44,15 @@
           "about": "The partition index." },
         { "name": "ControllerEpoch", "type": "int32", "versions": "5+",
           "about": "The controller epoch." },
-        { "name": "Leader", "type": "int32", "versions": "5+",
+        { "name": "Leader", "type": "int32", "versions": "5+", "entityType": "brokerId",
           "about": "The ID of the broker which is the current partition leader." },
         { "name": "LeaderEpoch", "type": "int32", "versions": "5+",
           "about": "The leader epoch of this partition." },
-        { "name": "Isr", "type": "[]int32", "versions": "5+",
+        { "name": "Isr", "type": "[]int32", "versions": "5+", "entityType": "brokerId",
           "about": "The brokers which are in the ISR for this partition." },
         { "name": "ZkVersion", "type": "int32", "versions": "5+",
           "about": "The Zookeeper version." },
-        { "name": "Replicas", "type": "[]int32", "versions": "5+",
+        { "name": "Replicas", "type": "[]int32", "versions": "5+", "entityType": "brokerId",
           "about": "All the replicas of this partition." },
         { "name": "OfflineReplicas", "type": "[]int32", "versions": "5+",
           "about": "The replicas of this partition which are offline." }
@@ -60,27 +60,28 @@
     ]},
     { "name": "PartitionStatesV0", "type": "[]UpdateMetadataRequestPartitionStateV0", "versions": "0-4",
       "about": "Each partition that we would like to update.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0-4",
+      { "name": "TopicName", "type": "string", "versions": "0-4", "entityType": "topicName",
         "about": "The topic name." },
       { "name": "PartitionIndex", "type": "int32", "versions": "0-4",
         "about": "The partition index." },
       { "name": "ControllerEpoch", "type": "int32", "versions": "0-4",
         "about": "The controller epoch." },
-      { "name": "Leader", "type": "int32", "versions": "0-4",
+      { "name": "Leader", "type": "int32", "versions": "0-4", "entityType": "brokerId",
         "about": "The ID of the broker which is the current partition leader." },
       { "name": "LeaderEpoch", "type": "int32", "versions": "0-4",
         "about": "The leader epoch of this partition." },
-      { "name": "Isr", "type": "[]int32", "versions": "0-4",
+      { "name": "Isr", "type": "[]int32", "versions": "0-4", "entityType": "brokerId",
         "about": "The brokers which are in the ISR for this partition." },
       { "name": "ZkVersion", "type": "int32", "versions": "0-4",
         "about": "The Zookeeper version." },
-      { "name": "Replicas", "type": "[]int32", "versions": "0-4",
+      { "name": "Replicas", "type": "[]int32", "versions": "0-4", "entityType": "brokerId",
         "about": "All the replicas of this partition." },
-      { "name": "OfflineReplicas", "type": "[]int32", "versions": "4",
+      { "name": "OfflineReplicas", "type": "[]int32", "versions": "4", "entityType": "brokerId",
         "about": "The replicas of this partition which are offline." }
     ]},
     { "name": "Brokers", "type": "[]UpdateMetadataRequestBroker", "versions": "0+", "fields": [
-        { "name": "Id", "type": "int32", "versions": "0+" },
+        { "name": "Id", "type": "int32", "versions": "0+", "entityType": "brokerId",
+          "about": "The broker id." },
         // Version 0 of the protocol only allowed specifying a single host and
         // port per broker, rather than an array of endpoints.
         { "name": "V0Host", "type": "string", "versions": "0", "ignorable": true,
diff --git a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
index 89868fc..63a7320 100644
--- a/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
+++ b/clients/src/main/resources/common/message/WriteTxnMarkersRequest.json
@@ -21,7 +21,7 @@
   "fields": [
     { "name": "Markers", "type": "[]WritableTxnMarker", "versions": "0+",
       "about": "The transaction markers to be written.", "fields": [
-      { "name": "ProducerId", "type": "int64", "versions": "0+",
+      { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
         "about": "The current producer ID."},
       { "name": "ProducerEpoch", "type": "int16", "versions": "0+",
         "about": "The current epoch associated with the producer ID." },
@@ -29,7 +29,7 @@
         "about": "The result of the transaction to write to the partitions (false = ABORT, true = COMMIT)." },
       { "name": "Topics", "type": "[]WritableTxnMarkerTopic", "versions": "0+",
         "about": "Each topic that we want to write transaction marker(s) for.", "fields": [
-        { "name": "Name", "type": "string", "versions": "0+",
+        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
           "about": "The topic name." },
         { "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
           "about": "The indexes of the partitions to write transaction markers for." }
diff --git a/clients/src/main/resources/common/message/WriteTxnMarkersResponse.json b/clients/src/main/resources/common/message/WriteTxnMarkersResponse.json
index ca08054..a1abee8 100644
--- a/clients/src/main/resources/common/message/WriteTxnMarkersResponse.json
+++ b/clients/src/main/resources/common/message/WriteTxnMarkersResponse.json
@@ -21,11 +21,11 @@
   "fields": [
     { "name": "Markers", "type": "[]WritableTxnMarkerResult", "versions": "0+",
       "about": "The results for writing makers.", "fields": [
-      { "name": "ProducerId", "type": "int64", "versions": "0+",
+      { "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
         "about": "The current producer ID in use by the transactional ID." },
       { "name": "Topics", "type": "[]WritableTxnMarkerTopicResult", "versions": "0+",
         "about": "The results by topic.", "fields": [
-        { "name": "Name", "type": "string", "versions": "0+",
+        { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
           "about": "The topic name." },
         { "name": "Partitions", "type": "[]WritableTxnMarkerPartitionResult", "versions": "0+",
           "about": "The results by partition.", "fields": [
diff --git a/generator/src/main/java/org/apache/kafka/message/EntityType.java b/generator/src/main/java/org/apache/kafka/message/EntityType.java
new file mode 100644
index 0000000..225c987
--- /dev/null
+++ b/generator/src/main/java/org/apache/kafka/message/EntityType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.message;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum EntityType {
+    @JsonProperty("unknown")
+    UNKNOWN(null),
+
+    @JsonProperty("transactionalId")
+    TRANSACTIONAL_ID(FieldType.StringFieldType.INSTANCE),
+
+    @JsonProperty("producerId")
+    PRODUCER_ID(FieldType.Int64FieldType.INSTANCE),
+
+    @JsonProperty("groupId")
+    GROUP_ID(FieldType.StringFieldType.INSTANCE),
+
+    @JsonProperty("topicName")
+    TOPIC_NAME(FieldType.StringFieldType.INSTANCE),
+
+    @JsonProperty("brokerId")
+    BROKER_ID(FieldType.Int32FieldType.INSTANCE);
+
+    private final FieldType baseType;
+
+    EntityType(FieldType baseType) {
+        this.baseType = baseType;
+    }
+
+    public void verifyTypeMatches(String fieldName, FieldType type) {
+        if (this == UNKNOWN) {
+            return;
+        }
+        if (type instanceof FieldType.ArrayType) {
+            FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
+            verifyTypeMatches(fieldName, arrayType.elementType());
+        } else {
+            if (!type.toString().equals(baseType.toString())) {
+                throw new RuntimeException("Field " + fieldName + " has entity type " +
+                    name() + ", but field type " + type.toString() + ", which does " +
+                    "not match.");
+            }
+        }
+    }
+}
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
index 76ea12a..3652aba 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldSpec.java
@@ -42,6 +42,8 @@ public final class FieldSpec {
 
     private final boolean ignorable;
 
+    private final EntityType entityType;
+
     private final String about;
 
     @JsonCreator
@@ -53,6 +55,7 @@ public final class FieldSpec {
                      @JsonProperty("nullableVersions") String nullableVersions,
                      @JsonProperty("default") String fieldDefault,
                      @JsonProperty("ignorable") boolean ignorable,
+                     @JsonProperty("entityType") EntityType entityType,
                      @JsonProperty("about") String about) {
         this.name = Objects.requireNonNull(name);
         this.versions = Versions.parse(versions, null);
@@ -72,6 +75,9 @@ public final class FieldSpec {
         }
         this.fieldDefault = fieldDefault == null ? "" : fieldDefault;
         this.ignorable = ignorable;
+        this.entityType = (entityType == null) ? EntityType.UNKNOWN : entityType;
+        this.entityType.verifyTypeMatches(name, this.type);
+
         this.about = about == null ? "" : about;
         if (!this.fields().isEmpty()) {
             if (!this.type.isArray()) {
diff --git a/generator/src/main/java/org/apache/kafka/message/FieldType.java b/generator/src/main/java/org/apache/kafka/message/FieldType.java
index 4534055..c920b8a 100644
--- a/generator/src/main/java/org/apache/kafka/message/FieldType.java
+++ b/generator/src/main/java/org/apache/kafka/message/FieldType.java
@@ -23,7 +23,7 @@ public interface FieldType {
     String STRUCT_PREFIX = "[]";
 
     final class BoolFieldType implements FieldType {
-        private static final BoolFieldType INSTANCE = new BoolFieldType();
+        static final BoolFieldType INSTANCE = new BoolFieldType();
         private static final String NAME = "bool";
 
         @Override
@@ -38,7 +38,7 @@ public interface FieldType {
     }
 
     final class Int8FieldType implements FieldType {
-        private static final Int8FieldType INSTANCE = new Int8FieldType();
+        static final Int8FieldType INSTANCE = new Int8FieldType();
         private static final String NAME = "int8";
 
         @Override
@@ -53,7 +53,7 @@ public interface FieldType {
     }
 
     final class Int16FieldType implements FieldType {
-        private static final Int16FieldType INSTANCE = new Int16FieldType();
+        static final Int16FieldType INSTANCE = new Int16FieldType();
         private static final String NAME = "int16";
 
         @Override
@@ -68,7 +68,7 @@ public interface FieldType {
     }
 
     final class Int32FieldType implements FieldType {
-        private static final Int32FieldType INSTANCE = new Int32FieldType();
+        static final Int32FieldType INSTANCE = new Int32FieldType();
         private static final String NAME = "int32";
 
         @Override
@@ -83,7 +83,7 @@ public interface FieldType {
     }
 
     final class Int64FieldType implements FieldType {
-        private static final Int64FieldType INSTANCE = new Int64FieldType();
+        static final Int64FieldType INSTANCE = new Int64FieldType();
         private static final String NAME = "int64";
 
         @Override
@@ -98,7 +98,7 @@ public interface FieldType {
     }
 
     final class StringFieldType implements FieldType {
-        private static final StringFieldType INSTANCE = new StringFieldType();
+        static final StringFieldType INSTANCE = new StringFieldType();
         private static final String NAME = "string";
 
         @Override
@@ -118,7 +118,7 @@ public interface FieldType {
     }
 
     final class BytesFieldType implements FieldType {
-        private static final BytesFieldType INSTANCE = new BytesFieldType();
+        static final BytesFieldType INSTANCE = new BytesFieldType();
         private static final String NAME = "bytes";
 
         @Override
diff --git a/generator/src/test/java/org/apache/kafka/message/EntityTypeTest.java b/generator/src/test/java/org/apache/kafka/message/EntityTypeTest.java
new file mode 100644
index 0000000..075a448
--- /dev/null
+++ b/generator/src/test/java/org/apache/kafka/message/EntityTypeTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.message;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import static org.junit.Assert.fail;
+
+public class EntityTypeTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    @Test
+    public void testUnknownEntityType() {
+        for (FieldType type : new FieldType[] {
+            FieldType.StringFieldType.INSTANCE,
+            FieldType.Int8FieldType.INSTANCE,
+            FieldType.Int16FieldType.INSTANCE,
+            FieldType.Int32FieldType.INSTANCE,
+            FieldType.Int64FieldType.INSTANCE,
+            new FieldType.ArrayType(FieldType.StringFieldType.INSTANCE)}) {
+            EntityType.UNKNOWN.verifyTypeMatches("unknown", type);
+        }
+    }
+
+    @Test
+    public void testVerifyTypeMatches() {
+        EntityType.TRANSACTIONAL_ID.verifyTypeMatches("transactionalIdField",
+            FieldType.StringFieldType.INSTANCE);
+        EntityType.TRANSACTIONAL_ID.verifyTypeMatches("transactionalIdField",
+            new FieldType.ArrayType(FieldType.StringFieldType.INSTANCE));
+        EntityType.PRODUCER_ID.verifyTypeMatches("producerIdField",
+            FieldType.Int64FieldType.INSTANCE);
+        EntityType.PRODUCER_ID.verifyTypeMatches("producerIdField",
+            new FieldType.ArrayType(FieldType.Int64FieldType.INSTANCE));
+        EntityType.GROUP_ID.verifyTypeMatches("groupIdField",
+            FieldType.StringFieldType.INSTANCE);
+        EntityType.GROUP_ID.verifyTypeMatches("groupIdField",
+            new FieldType.ArrayType(FieldType.StringFieldType.INSTANCE));
+        EntityType.TOPIC_NAME.verifyTypeMatches("topicNameField",
+            FieldType.StringFieldType.INSTANCE);
+        EntityType.TOPIC_NAME.verifyTypeMatches("topicNameField",
+            new FieldType.ArrayType(FieldType.StringFieldType.INSTANCE));
+        EntityType.BROKER_ID.verifyTypeMatches("brokerIdField",
+            FieldType.Int32FieldType.INSTANCE);
+        EntityType.BROKER_ID.verifyTypeMatches("brokerIdField",
+            new FieldType.ArrayType(FieldType.Int32FieldType.INSTANCE));
+    }
+
+    private static void expectException(Runnable r) {
+        try {
+            r.run();
+            fail("expected an exception");
+        } catch (RuntimeException e) {
+        }
+    }
+
+    @Test
+    public void testVerifyTypeMismatches() {
+        expectException(() -> EntityType.TRANSACTIONAL_ID.
+            verifyTypeMatches("transactionalIdField", FieldType.Int32FieldType.INSTANCE));
+        expectException(() -> EntityType.PRODUCER_ID.
+            verifyTypeMatches("producerIdField", FieldType.StringFieldType.INSTANCE));
+        expectException(() -> EntityType.GROUP_ID.
+            verifyTypeMatches("groupIdField", FieldType.Int8FieldType.INSTANCE));
+        expectException(() -> EntityType.TOPIC_NAME.
+            verifyTypeMatches("topicNameField",
+                new FieldType.ArrayType(FieldType.Int64FieldType.INSTANCE)));
+        expectException(() -> EntityType.BROKER_ID.
+            verifyTypeMatches("brokerIdField", FieldType.Int64FieldType.INSTANCE));
+    }
+}