You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/22 20:24:00 UTC

[jira] [Commented] (KAFKA-2617) Move protocol field default values to Protocol

    [ https://issues.apache.org/jira/browse/KAFKA-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301913#comment-16301913 ] 

ASF GitHub Bot commented on KAFKA-2617:
---------------------------------------

guozhangwang closed pull request #338: KAFKA-2617: Move protocol field default values to Protocol.
URL: https://github.com/apache/kafka/pull/338
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 98193e829ad..5b9a8d0bc5c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -102,8 +102,8 @@ public Coordinator(ConsumerNetworkClient client,
                        long autoCommitIntervalMs) {
         this.client = client;
         this.time = time;
-        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
-        this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+        this.generation = OffsetCommitRequest.getDefaultGenerationId();
+        this.consumerId = JoinGroupRequest.getDefaultConsumerId();
         this.groupId = groupId;
         this.consumerCoordinator = null;
         this.subscriptions = subscriptions;
@@ -353,7 +353,7 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture<Void> future) {
                 future.complete(null);
             } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()) {
                 // reset the consumer id and retry immediately
-                Coordinator.this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+                Coordinator.this.consumerId = JoinGroupRequest.getDefaultConsumerId();
                 log.info("Attempt to join group {} failed due to unknown consumer id, resetting and retrying.",
                         groupId);
                 future.raise(Errors.UNKNOWN_CONSUMER_ID);
@@ -453,8 +453,8 @@ private void maybeAutoCommitOffsetsSync() {
      * Reset the generation/consumerId tracked by this consumer.
      */
     public void resetGeneration() {
-        this.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID;
-        this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+        this.generation = OffsetCommitRequest.getDefaultGenerationId();
+        this.consumerId = JoinGroupRequest.getDefaultConsumerId();
     }
 
     /**
@@ -483,7 +483,7 @@ public void resetGeneration() {
         OffsetCommitRequest req = new OffsetCommitRequest(this.groupId,
                 this.generation,
                 this.consumerId,
-                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
+                OffsetCommitRequest.getDefaultRetentionId(),
                 offsetData);
 
         return client.send(consumerCoordinator, ApiKeys.OFFSET_COMMIT, req)
@@ -720,7 +720,7 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> futu
                 future.raise(Errors.ILLEGAL_GENERATION);
             } else if (error == Errors.UNKNOWN_CONSUMER_ID.code()) {
                 log.info("Attempt to heart beat failed since consumer id is not valid, reset it and try to re-join group.");
-                consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID;
+                consumerId = JoinGroupRequest.getDefaultConsumerId();
                 subscriptions.needReassignment();
                 future.raise(Errors.UNKNOWN_CONSUMER_ID);
             } else {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 9f8e9817437..9f4a3de8030 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -20,6 +20,8 @@
 import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 
+import java.nio.ByteBuffer;
+
 import static org.apache.kafka.common.protocol.types.Type.*;
 
 public class Protocol {
@@ -130,6 +132,9 @@
                                                                                new Field("offset",
                                                                                          INT64,
                                                                                          "Message offset to be committed."),
+                                                                               new Field("timestamp",
+                                                                                         INT64,
+                                                                                         "Timestamp of the commit", -1L),
                                                                                new Field("metadata",
                                                                                          STRING,
                                                                                          "Any associated metadata the client wants to keep."));
@@ -142,7 +147,7 @@
                                                                                          "Message offset to be committed."),
                                                                                new Field("timestamp",
                                                                                          INT64,
-                                                                                         "Timestamp of the commit"),
+                                                                                         "Timestamp of the commit", -1L),
                                                                                new Field("metadata",
                                                                                          STRING,
                                                                                          "Any associated metadata the client wants to keep."));
@@ -203,13 +208,13 @@
                                                                                "The consumer group id."),
                                                                      new Field("group_generation_id",
                                                                                INT32,
-                                                                               "The generation of the consumer group."),
+                                                                               "The generation of the consumer group.", -1),
                                                                      new Field("consumer_id",
                                                                                STRING,
-                                                                               "The consumer id assigned by the group coordinator."),
+                                                                               "The consumer id assigned by the group coordinator.", ""),
                                                                      new Field("retention_time",
                                                                                INT64,
-                                                                               "Time period in ms to retain the offset."),
+                                                                               "Time period in ms to retain the offset.", -1L),
                                                                      new Field("topics",
                                                                                new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2),
                                                                                "Topics to commit offsets."));
@@ -265,10 +270,10 @@
                                                                                          "Topic partition id."),
                                                                                new Field("offset",
                                                                                          INT64,
-                                                                                         "Last committed message offset."),
+                                                                                         "Last committed message offset.", -1L),
                                                                                new Field("metadata",
                                                                                          STRING,
-                                                                                         "Any associated metadata the client wants to keep."),
+                                                                                         "Any associated metadata the client wants to keep.", ""),
                                                                                new Field("error_code", INT16));
 
     public static final Schema OFFSET_FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
@@ -343,7 +348,7 @@
 
     public static final Schema FETCH_REQUEST_V0 = new Schema(new Field("replica_id",
                                                                        INT32,
-                                                                       "Broker id of the follower. For normal consumers, use -1."),
+                                                                       "Broker id of the follower. For normal consumers, use -1.", -1),
                                                              new Field("max_wait_time",
                                                                        INT32,
                                                                        "Maximum time in ms to wait for the response."),
@@ -363,14 +368,21 @@
                                                                         new Field("error_code", INT16),
                                                                         new Field("high_watermark",
                                                                                   INT64,
-                                                                                  "Last committed offset."),
-                                                                        new Field("record_set", BYTES));
+                                                                                  "Last committed offset.", -1L),
+                                                                        new Field("record_set",
+                                                                                  BYTES,
+                                                                                  "Partition bytes.", ByteBuffer.allocate(0)));
 
     public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING),
                                                                     new Field("partition_responses",
                                                                               new ArrayOf(FETCH_RESPONSE_PARTITION_V0)));
 
-    public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses",
+    public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("throttle_time_ms",
+                                                                        INT32,
+                                                                        "Duration in milliseconds for which the request was throttled" +
+                                                                                " due to quota violation. (Zero if the request did not violate any quota.)",
+                                                                        0),
+                                                              new Field("responses",
                                                                         new ArrayOf(FETCH_RESPONSE_TOPIC_V0)));
     public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms",
                                                                         INT32,
@@ -427,7 +439,7 @@
                                                                             "An array of topics to subscribe to."),
                                                                   new Field("consumer_id",
                                                                             STRING,
-                                                                            "The assigned consumer id or an empty string for a new consumer."),
+                                                                            "The assigned consumer id or an empty string for a new consumer.", ""),
                                                                   new Field("partition_assignment_strategy",
                                                                             STRING,
                                                                             "The strategy for the coordinator to assign partitions."));
@@ -437,10 +449,10 @@
     public static final Schema JOIN_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16),
                                                                    new Field("group_generation_id",
                                                                              INT32,
-                                                                             "The generation of the consumer group."),
+                                                                             "The generation of the consumer group.", -1),
                                                                    new Field("consumer_id",
                                                                              STRING,
-                                                                             "The consumer id assigned by the group coordinator."),
+                                                                             "The consumer id assigned by the group coordinator.", ""),
                                                                    new Field("assigned_partitions",
                                                                              new ArrayOf(JOIN_GROUP_RESPONSE_TOPIC_V0)));
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index 3a14ac0fb35..0154037c4e1 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -121,6 +121,16 @@ public Field get(String name) {
         return this.fields;
     }
 
+    /**
+     * Get a nested scheme its name
+     *
+     * @param name The name of the field
+     * @return The field
+     */
+    public Schema getNestedSchema(String name) {
+        return (Schema) ((ArrayOf) this.fieldsByName.get(name).type).type();
+    }
+
     /**
      * Display a string representation of the schema
      */
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index feb4109bf8b..ec42a425e89 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -28,7 +28,6 @@
 
 public class FetchRequest extends AbstractRequest {
     
-    public static final int CONSUMER_REPLICA_ID = -1;
     private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
     private static final String REPLICA_ID_KEY_NAME = "replica_id";
     private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
@@ -63,7 +62,7 @@ public PartitionData(long offset, int maxBytes) {
      * Create a non-replica fetch request
      */
     public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+        this((int) CURRENT_SCHEMA.get(REPLICA_ID_KEY_NAME).defaultValue, maxWait, minBytes, fetchData);
     }
 
     /**
@@ -125,8 +124,8 @@ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
 
         for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
             FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
-                    FetchResponse.INVALID_HIGHWATERMARK,
-                    FetchResponse.EMPTY_RECORD_SET);
+                    FetchResponse.getInvalidHighWatermark(),
+                    FetchResponse.getEmptyRecordSet());
             responseData.put(entry.getKey(), partitionResponse);
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 7b7841579c7..4e9bbcf0604 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -46,9 +46,6 @@
     private static final String PARTITION_KEY_NAME = "partition";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
-    // Default throttle time
-    private static final int DEFAULT_THROTTLE_TIME = 0;
-
   /**
      * Possible error code:
      *
@@ -62,9 +59,6 @@
     private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
     private static final String RECORD_SET_KEY_NAME = "record_set";
 
-    public static final long INVALID_HIGHWATERMARK = -1L;
-    public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
-
     private final Map<TopicPartition, PartitionData> responseData;
     private final int throttleTime;
 
@@ -88,7 +82,7 @@ public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
         super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0)));
         initCommonFields(responseData);
         this.responseData = responseData;
-        this.throttleTime = DEFAULT_THROTTLE_TIME;
+        this.throttleTime = (int) CURRENT_SCHEMA.get(THROTTLE_TIME_KEY_NAME).defaultValue;
     }
 
   /**
@@ -120,7 +114,8 @@ public FetchResponse(Struct struct) {
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
-        this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
+        this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME)
+                : (int) CURRENT_SCHEMA.get(THROTTLE_TIME_KEY_NAME).defaultValue;
     }
 
     private void initCommonFields(Map<TopicPartition, PartitionData> responseData) {
@@ -162,4 +157,14 @@ public static FetchResponse parse(ByteBuffer buffer) {
     public static FetchResponse parse(ByteBuffer buffer, int version) {
         return new FetchResponse((Struct) ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
     }
+
+    public static long getInvalidHighWatermark() {
+        return (long) CURRENT_SCHEMA.getNestedSchema(RESPONSES_KEY_NAME).getNestedSchema(PARTITIONS_KEY_NAME)
+                .get(HIGH_WATERMARK_KEY_NAME).defaultValue;
+    }
+
+    public static ByteBuffer getEmptyRecordSet() {
+        return (ByteBuffer) CURRENT_SCHEMA.getNestedSchema(RESPONSES_KEY_NAME).getNestedSchema(PARTITIONS_KEY_NAME)
+                .get(RECORD_SET_KEY_NAME).defaultValue;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 1ffe0760b40..a69d7f7f1bb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -33,7 +33,8 @@
     private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
     private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
 
-    public static final String UNKNOWN_CONSUMER_ID = "";
+    private static final Schema CURRENT_RESPONSE_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
+    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
 
     private final String groupId;
     private final int sessionTimeout;
@@ -73,8 +74,8 @@ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
             case 0:
                 return new JoinGroupResponse(
                         Errors.forException(e).code(),
-                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                        JoinGroupResponse.UNKNOWN_CONSUMER_ID,
+                        (int) CURRENT_RESPONSE_SCHEMA.get(GENERATION_ID_KEY_NAME).defaultValue,
+                        CURRENT_RESPONSE_SCHEMA.get(CONSUMER_ID_KEY_NAME).defaultValue.toString(),
                         Collections.<TopicPartition>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
@@ -109,4 +110,8 @@ public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
     public static JoinGroupRequest parse(ByteBuffer buffer) {
         return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }
+
+    public static String getDefaultConsumerId() {
+        return CURRENT_SCHEMA.get(CONSUMER_ID_KEY_NAME).defaultValue.toString();
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 7bf544ef170..b8ddad00c36 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -44,9 +44,6 @@
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partitions";
 
-    public static final int UNKNOWN_GENERATION_ID = -1;
-    public static final String UNKNOWN_CONSUMER_ID = "";
-
     private final short errorCode;
     private final int generationId;
     private final String consumerId;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 03df1e780c0..694dea085ff 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -16,6 +16,7 @@
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
@@ -50,16 +51,6 @@
     @Deprecated
     private static final String TIMESTAMP_KEY_NAME = "timestamp";         // for v0, v1
 
-    // default values for the current version
-    public static final int DEFAULT_GENERATION_ID = -1;
-    public static final String DEFAULT_CONSUMER_ID = "";
-    public static final long DEFAULT_RETENTION_TIME = -1L;
-
-    // default values for old versions,
-    // will be removed after these versions are deprecated
-    @Deprecated
-    public static final long DEFAULT_TIMESTAMP = -1L;            // for V0, V1
-
     private final String groupId;
     private final String consumerId;
     private final int generationId;
@@ -81,7 +72,7 @@ public PartitionData(long offset, long timestamp, String metadata) {
         }
 
         public PartitionData(long offset, String metadata) {
-            this(offset, DEFAULT_TIMESTAMP, metadata);
+            this(offset, getDefaultTimestamp(), metadata);
         }
     }
 
@@ -96,9 +87,9 @@ public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> of
 
         initCommonFields(groupId, offsetData);
         this.groupId = groupId;
-        this.generationId = DEFAULT_GENERATION_ID;
-        this.consumerId = DEFAULT_CONSUMER_ID;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
+        this.generationId = (int) CURRENT_SCHEMA.get(GENERATION_ID_KEY_NAME).defaultValue;
+        this.consumerId = CURRENT_SCHEMA.get(CONSUMER_ID_KEY_NAME).defaultValue.toString();
+        this.retentionTime = (long) CURRENT_SCHEMA.get(RETENTION_TIME_KEY_NAME).defaultValue;
         this.offsetData = offsetData;
     }
 
@@ -119,7 +110,7 @@ public OffsetCommitRequest(String groupId, int generationId, String consumerId,
         this.groupId = groupId;
         this.generationId = generationId;
         this.consumerId = consumerId;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
+        this.retentionTime = (long) CURRENT_SCHEMA.get(RETENTION_TIME_KEY_NAME).defaultValue;
         this.offsetData = offsetData;
     }
 
@@ -180,19 +171,19 @@ public OffsetCommitRequest(Struct struct) {
         if (struct.hasField(GENERATION_ID_KEY_NAME))
             generationId = struct.getInt(GENERATION_ID_KEY_NAME);
         else
-            generationId = DEFAULT_GENERATION_ID;
+            generationId = (int) CURRENT_SCHEMA.get(GENERATION_ID_KEY_NAME).defaultValue;
 
         // This field only exists in v1.
         if (struct.hasField(CONSUMER_ID_KEY_NAME))
             consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
         else
-            consumerId = DEFAULT_CONSUMER_ID;
+            consumerId = CURRENT_SCHEMA.get(CONSUMER_ID_KEY_NAME).defaultValue.toString();
 
         // This field only exists in v2
         if (struct.hasField(RETENTION_TIME_KEY_NAME))
             retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME);
         else
-            retentionTime = DEFAULT_RETENTION_TIME;
+            retentionTime = (long) CURRENT_SCHEMA.get(RETENTION_TIME_KEY_NAME).defaultValue;
 
         offsetData = new HashMap<TopicPartition, PartitionData>();
         for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
@@ -263,4 +254,21 @@ public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
     public static OffsetCommitRequest parse(ByteBuffer buffer) {
         return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer));
     }
+
+    public static int getDefaultGenerationId() {
+        return (int) CURRENT_SCHEMA.get(GENERATION_ID_KEY_NAME).defaultValue;
+    }
+
+    public static long getDefaultRetentionId() {
+        return (long) CURRENT_SCHEMA.get(RETENTION_TIME_KEY_NAME).defaultValue;
+    }
+
+    public static String getDefaultConsumerId() {
+        return CURRENT_SCHEMA.get(CONSUMER_ID_KEY_NAME).defaultValue.toString();
+    }
+
+    public static long getDefaultTimestamp() {
+        Field timestampField = CURRENT_SCHEMA.getNestedSchema(TOPICS_KEY_NAME).getNestedSchema(PARTITIONS_KEY_NAME).get(TIMESTAMP_KEY_NAME);
+        return timestampField == null ? -1L : (long) timestampField.defaultValue;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 6ee75973d64..c1b0d1f1b8a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -89,8 +89,8 @@ public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
         Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
 
         for (TopicPartition partition: partitions) {
-            responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
-                    OffsetFetchResponse.NO_METADATA,
+            responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.getInvalidOffset(),
+                    OffsetFetchResponse.getEmptyMetadata(),
                     Errors.forException(e).code()));
         }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 3dc8521296e..52f5f1f2e34 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -41,9 +41,6 @@
     private static final String METADATA_KEY_NAME = "metadata";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
-    public static final long INVALID_OFFSET = -1L;
-    public static final String NO_METADATA = "";
-
     /**
      * Possible error code:
      *
@@ -123,4 +120,14 @@ public OffsetFetchResponse(Struct struct) {
     public static OffsetFetchResponse parse(ByteBuffer buffer) {
         return new OffsetFetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
     }
+
+    public static long getInvalidOffset() {
+        return (long) CURRENT_SCHEMA.getNestedSchema(RESPONSES_KEY_NAME).getNestedSchema(PARTITIONS_KEY_NAME)
+                .get(COMMIT_OFFSET_KEY_NAME).defaultValue;
+    }
+
+    public static String getEmptyMetadata() {
+        return (String) CURRENT_SCHEMA.getNestedSchema(RESPONSES_KEY_NAME).getNestedSchema(PARTITIONS_KEY_NAME)
+                .get(METADATA_KEY_NAME).defaultValue;
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 28685501897..d76fe0d8b86 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -43,7 +43,6 @@
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     public static final long INVALID_OFFSET = -1L;
-    private static final int DEFAULT_THROTTLE_TIME = 0;
 
     /**
      * Possible error code:
@@ -64,7 +63,7 @@ public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
         super(new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0)));
         initCommonFields(responses);
         this.responses = responses;
-        this.throttleTime = DEFAULT_THROTTLE_TIME;
+        this.throttleTime = (int) CURRENT_SCHEMA.get(THROTTLE_TIME_KEY_NAME).defaultValue;
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 66b2e32f402..a7b4fd356cc 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -383,8 +383,8 @@ public void testResetGeneration() {
             @Override
             public boolean matches(ClientRequest request) {
                 OffsetCommitRequest commitRequest = new OffsetCommitRequest(request.request().body());
-                return commitRequest.consumerId().equals(OffsetCommitRequest.DEFAULT_CONSUMER_ID) &&
-                        commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
+                return commitRequest.consumerId().equals(OffsetCommitRequest.getDefaultConsumerId()) &&
+                        commitRequest.generationId() == OffsetCommitRequest.getDefaultGenerationId();
             }
         }, offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
 
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 5b362ef7a76..13451978850 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -48,20 +48,20 @@ object OffsetCommitRequest extends Logging {
       if (versionId >= 1)
         buffer.getInt
       else
-        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID
+        org.apache.kafka.common.requests.OffsetCommitRequest.getDefaultGenerationId
 
     val consumerId: String =
       if (versionId >= 1)
         readShortString(buffer)
       else
-        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID
+        org.apache.kafka.common.requests.OffsetCommitRequest.getDefaultConsumerId
 
     // version 2 specific fields
     val retentionMs: Long =
       if (versionId >= 2)
         buffer.getLong
       else
-        org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME
+        org.apache.kafka.common.requests.OffsetCommitRequest.getDefaultRetentionId
 
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
@@ -92,9 +92,9 @@ case class OffsetCommitRequest(groupId: String,
                                versionId: Short = OffsetCommitRequest.CurrentVersion,
                                correlationId: Int = 0,
                                clientId: String = OffsetCommitRequest.DefaultClientId,
-                               groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_GENERATION_ID,
-                               consumerId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_CONSUMER_ID,
-                               retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME)
+                               groupGenerationId: Int = org.apache.kafka.common.requests.OffsetCommitRequest.getDefaultGenerationId,
+                               consumerId: String =  org.apache.kafka.common.requests.OffsetCommitRequest.getDefaultConsumerId,
+                               retentionMs: Long = org.apache.kafka.common.requests.OffsetCommitRequest.getDefaultRetentionId)
     extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
 
   assert(versionId == 0 || versionId == 1 || versionId == 2,
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index bf23e9ba198..0e5c14f6ff9 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -121,7 +121,7 @@ class ConsumerCoordinator(val brokerId: Int,
       // exist we should reject the request
       var group = coordinatorMetadata.getGroup(groupId)
       if (group == null) {
-        if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
+        if (consumerId != JoinGroupRequest.getDefaultConsumerId) {
           responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
         } else {
           group = coordinatorMetadata.addGroup(groupId, partitionAssignmentStrategy)
@@ -148,7 +148,7 @@ class ConsumerCoordinator(val brokerId: Int,
         responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
       } else if (partitionAssignmentStrategy != group.partitionAssignmentStrategy) {
         responseCallback(Set.empty, consumerId, 0, Errors.INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY.code)
-      } else if (consumerId != JoinGroupRequest.UNKNOWN_CONSUMER_ID && !group.has(consumerId)) {
+      } else if (consumerId != JoinGroupRequest.getDefaultConsumerId && !group.has(consumerId)) {
         // if the consumer trying to register with a un-recognized id, send the response to let
         // it reset its consumer id and retry
         responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_CONSUMER_ID.code)
@@ -161,7 +161,7 @@ class ConsumerCoordinator(val brokerId: Int,
         completeAndScheduleNextHeartbeatExpiration(group, consumer)
         responseCallback(consumer.assignedTopicPartitions, consumerId, group.generationId, Errors.NONE.code)
       } else {
-        val consumer = if (consumerId == JoinGroupRequest.UNKNOWN_CONSUMER_ID) {
+        val consumer = if (consumerId == JoinGroupRequest.getDefaultConsumerId) {
           // if the consumer id is unknown, register this consumer to the group
           val generatedConsumerId = group.generateNextConsumerId
           val consumer = addConsumer(generatedConsumerId, topics, sessionTimeoutMs, group)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6acab8da18f..e6235c3c114 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -239,7 +239,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       // if it is v1 or not specified by user, we can use the default retention
       val offsetRetention =
         if (offsetCommitRequest.versionId <= 1 ||
-          offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) {
+          offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.getDefaultRetentionId) {
           coordinator.offsetConfig.offsetsRetentionMs
         } else {
           offsetCommitRequest.retentionMs
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index c108955f59f..5d63d89ef70 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -71,7 +71,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testJoinGroupWrongCoordinator() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = false)
@@ -82,7 +82,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testJoinGroupUnknownPartitionAssignmentStrategy() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "foo"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
@@ -93,7 +93,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testJoinGroupSessionTimeoutTooSmall() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMinSessionTimeout - 1, isCoordinatorForGroup = true)
@@ -104,7 +104,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testJoinGroupSessionTimeoutTooLarge() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, ConsumerMaxSessionTimeout + 1, isCoordinatorForGroup = true)
@@ -126,7 +126,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testValidJoinGroup() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
@@ -137,8 +137,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testJoinGroupInconsistentPartitionAssignmentStrategy() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
+    val otherConsumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
     val otherPartitionAssignmentStrategy = "roundrobin"
 
@@ -155,7 +155,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testJoinGroupUnknownConsumerExistingGroup() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val otherConsumerId = "consumerId"
     val partitionAssignmentStrategy = "range"
 
@@ -190,7 +190,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testHeartbeatUnknownConsumerExistingGroup() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val otherConsumerId = "consumerId"
     val partitionAssignmentStrategy = "range"
 
@@ -206,7 +206,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testHeartbeatIllegalGeneration() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
@@ -222,7 +222,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testValidHeartbeat() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
@@ -253,8 +253,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     val tp = new TopicAndPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
-    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_CONSUMER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset), true)
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.getDefaultConsumerId,
+      OffsetCommitRequest.getDefaultGenerationId, Map(tp -> offset), true)
     assertEquals(Errors.NONE.code, commitOffsetResult(tp))
   }
 
@@ -264,7 +264,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     val partitionAssignmentStrategy = "range"
 
     // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
-    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.getDefaultConsumerId, partitionAssignmentStrategy,
       DefaultSessionTimeout, isCoordinatorForGroup = true)
     val assignedConsumerId = joinGroupResult._2
     val initialGenerationId = joinGroupResult._3
@@ -273,7 +273,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
 
     // Then join with a new consumer to trigger a rebalance
     EasyMock.reset(offsetManager)
-    sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_CONSUMER_ID, partitionAssignmentStrategy,
+    sendJoinGroup(groupId, JoinGroupRequest.getDefaultConsumerId, partitionAssignmentStrategy,
       DefaultSessionTimeout, isCoordinatorForGroup = true)
 
     // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
@@ -285,8 +285,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testGenerationIdIncrementsOnRebalance() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
-    val otherConsumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
+    val otherConsumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)
@@ -306,7 +306,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testLeaveGroupWrongCoordinator() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
 
     val leaveGroupResult = leaveGroup(groupId, consumerId, isCoordinatorForGroup = false)
     assertEquals(Errors.NOT_COORDINATOR_FOR_CONSUMER.code, leaveGroupResult)
@@ -324,7 +324,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testLeaveGroupUnknownConsumerExistingGroup() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val otherConsumerId = "consumerId"
     val partitionAssignmentStrategy = "range"
 
@@ -340,7 +340,7 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testValidLeaveGroup() {
     val groupId = "groupId"
-    val consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID
+    val consumerId = JoinGroupRequest.getDefaultConsumerId
     val partitionAssignmentStrategy = "range"
 
     val joinGroupResult = joinGroup(groupId, consumerId, partitionAssignmentStrategy, DefaultSessionTimeout, isCoordinatorForGroup = true)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Move protocol field default values to Protocol
> ----------------------------------------------
>
>                 Key: KAFKA-2617
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2617
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Jakub Nowak
>            Priority: Minor
>              Labels: newbie
>
> Right now the default values are scattered in the Request / Response classes, and some duplicates already exists like JoinGroupRequest.UNKNOWN_CONSUMER_ID and OffsetCommitRequest.DEFAULT_CONSUMER_ID. We would like to move all default values into org.apache.kafka.common.protocol.Protocol since org.apache.kafka.common.requests depends on org.apache.kafka.common.protocol anyways.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)