You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2015/01/30 03:39:33 UTC
[3/7] kafka git commit: KAFKA-1760: New consumer.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 121e880..ee1f78f 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -1,18 +1,14 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.protocol.types;
@@ -124,14 +120,6 @@ public class Struct {
return (Long) get(name);
}
- public ByteBuffer getBytes(Field field) {
- return (ByteBuffer) get(field);
- }
-
- public ByteBuffer getBytes(String name) {
- return (ByteBuffer) get(name);
- }
-
public Object[] getArray(Field field) {
return (Object[]) get(field);
}
@@ -148,6 +136,14 @@ public class Struct {
return (String) get(name);
}
+ public ByteBuffer getBytes(Field field) {
+ return (ByteBuffer) get(field);
+ }
+
+ public ByteBuffer getBytes(String name) {
+ return (ByteBuffer) get(name);
+ }
+
/**
* Set the given field to the specified value
*
@@ -175,9 +171,9 @@ public class Struct {
}
/**
- * Create a struct for the schema of a container type (struct or array).
- * Note that for array type, this method assumes that the type is an array of schema and creates a struct
- * of that schema. Arrays of other types can't be instantiated with this method.
+ * Create a struct for the schema of a container type (struct or array). Note that for array type, this method
+ * assumes that the type is an array of schema and creates a struct of that schema. Arrays of other types can't be
+ * instantiated with this method.
*
* @param field The field to create an instance of
* @return The struct
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
index e4d688c..2e54b56 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
@@ -41,4 +41,8 @@ public final class LogEntry {
public String toString() {
return "LogEntry(" + offset + ", " + record + ")";
}
+
+ public int size() {
+ return record.size() + Records.LOG_OVERHEAD;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 040e5b9..cc4084f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -55,7 +55,7 @@ public class MemoryRecords implements Records {
return emptyRecords(buffer, type, buffer.capacity());
}
- public static MemoryRecords iterableRecords(ByteBuffer buffer) {
+ public static MemoryRecords readableRecords(ByteBuffer buffer) {
return new MemoryRecords(buffer, CompressionType.NONE, false, buffer.capacity());
}
@@ -94,22 +94,21 @@ public class MemoryRecords implements Records {
* Note that the return value is based on the estimate of the bytes written to the compressor, which may not be
* accurate if compression is really used. When this happens, the following append may cause dynamic buffer
* re-allocation in the underlying byte buffer stream.
- *
+ *
* Also note that besides the records' capacity, there is also a size limit for the batch. This size limit may be
* smaller than the capacity (e.g. when appending a single message whose size is larger than the batch size, the
- * capacity will be the message size, but the size limit will still be the batch size), and when the records' size has
- * exceed this limit we also mark this record as full.
+ * capacity will be the message size, but the size limit will still be the batch size), and when the records' size
+ * has exceed this limit we also mark this record as full.
*/
public boolean hasRoomFor(byte[] key, byte[] value) {
- return this.writable &&
- this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value) &&
- this.sizeLimit >= this.compressor.estimatedBytesWritten();
+ return this.writable && this.capacity >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD +
+ Record.recordSize(key, value) &&
+ this.sizeLimit >= this.compressor.estimatedBytesWritten();
}
public boolean isFull() {
- return !this.writable ||
- this.capacity <= this.compressor.estimatedBytesWritten() ||
- this.sizeLimit <= this.compressor.estimatedBytesWritten();
+ return !this.writable || this.capacity <= this.compressor.estimatedBytesWritten() ||
+ this.sizeLimit <= this.compressor.estimatedBytesWritten();
}
/**
@@ -132,7 +131,7 @@ public class MemoryRecords implements Records {
public int sizeInBytes() {
return compressor.buffer().position();
}
-
+
/**
* The compression rate of this record set
*/
@@ -162,6 +161,25 @@ public class MemoryRecords implements Records {
ByteBuffer copy = (ByteBuffer) this.buffer.duplicate().flip();
return new RecordsIterator(copy, CompressionType.NONE, false);
}
+
+ @Override
+ public String toString() {
+ Iterator<LogEntry> iter = iterator();
+ StringBuilder builder = new StringBuilder();
+ builder.append('[');
+ while(iter.hasNext()) {
+ LogEntry entry = iter.next();
+ builder.append('(');
+ builder.append("offset=");
+ builder.append(entry.offset());
+ builder.append(",");
+ builder.append("record=");
+ builder.append(entry.record());
+ builder.append(")");
+ }
+ builder.append(']');
+ return builder.toString();
+ }
public static class RecordsIterator extends AbstractIterator<LogEntry> {
private final ByteBuffer buffer;
@@ -174,7 +192,7 @@ public class MemoryRecords implements Records {
this.type = type;
this.buffer = buffer;
this.shallow = shallow;
- stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
+ this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type);
}
/*
@@ -199,7 +217,10 @@ public class MemoryRecords implements Records {
ByteBuffer rec;
if (type == CompressionType.NONE) {
rec = buffer.slice();
- buffer.position(buffer.position() + size);
+ int newPos = buffer.position() + size;
+ if(newPos > buffer.limit())
+ return allDone();
+ buffer.position(newPos);
rec.limit(size);
} else {
byte[] recordBuffer = new byte[size];
@@ -207,7 +228,6 @@ public class MemoryRecords implements Records {
rec = ByteBuffer.wrap(recordBuffer);
}
LogEntry entry = new LogEntry(offset, new Record(rec));
- entry.record().ensureValid();
// decide whether to go shallow or deep iteration if it is compressed
CompressionType compression = entry.record().compressionType();
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
index 99b52c2..4c99d4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
@@ -20,13 +20,14 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class ConsumerMetadataRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
- private static String GROUP_ID_KEY_NAME = "group_id";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONSUMER_METADATA.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
private final String groupId;
public ConsumerMetadataRequest(String groupId) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
this.groupId = groupId;
@@ -42,6 +43,6 @@ public class ConsumerMetadataRequest extends AbstractRequestResponse {
}
public static ConsumerMetadataRequest parse(ByteBuffer buffer) {
- return new ConsumerMetadataRequest(((Struct) curSchema.read(buffer)));
+ return new ConsumerMetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
index 8b8f591..173333b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
@@ -21,20 +21,21 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class ConsumerMetadataResponse extends AbstractRequestResponse {
- private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
- private static String ERROR_CODE_KEY_NAME = "error_code";
- private static String COORDINATOR_KEY_NAME = "coordinator";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONSUMER_METADATA.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String COORDINATOR_KEY_NAME = "coordinator";
// coordinator level field names
- private static String NODE_ID_KEY_NAME = "node_id";
- private static String HOST_KEY_NAME = "host";
- private static String PORT_KEY_NAME = "port";
+ private static final String NODE_ID_KEY_NAME = "node_id";
+ private static final String HOST_KEY_NAME = "host";
+ private static final String PORT_KEY_NAME = "port";
private final short errorCode;
private final Node node;
public ConsumerMetadataResponse(short errorCode, Node node) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
struct.set(ERROR_CODE_KEY_NAME, errorCode);
Struct coordinator = struct.instance(COORDINATOR_KEY_NAME);
coordinator.set(NODE_ID_KEY_NAME, node.id());
@@ -64,6 +65,6 @@ public class ConsumerMetadataResponse extends AbstractRequestResponse {
}
public static ConsumerMetadataResponse parse(ByteBuffer buffer) {
- return new ConsumerMetadataResponse(((Struct) curSchema.read(buffer)));
+ return new ConsumerMetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 2fc471f..2529a09 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -1,21 +1,23 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.requests;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
@@ -23,27 +25,23 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class FetchRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
- private static String REPLICA_ID_KEY_NAME = "replica_id";
- private static String MAX_WAIT_KEY_NAME = "max_wait_time";
- private static String MIN_BYTES_KEY_NAME = "min_bytes";
- private static String TOPICS_KEY_NAME = "topics";
+
+ public static final int CONSUMER_REPLICA_ID = -1;
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
+ private static final String REPLICA_ID_KEY_NAME = "replica_id";
+ private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
+ private static final String MIN_BYTES_KEY_NAME = "min_bytes";
+ private static final String TOPICS_KEY_NAME = "topics";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partitions";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
- private static String FETCH_OFFSET_KEY_NAME = "fetch_offset";
- private static String MAX_BYTES_KEY_NAME = "max_bytes";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
+ private static final String MAX_BYTES_KEY_NAME = "max_bytes";
private final int replicaId;
private final int maxWait;
@@ -60,15 +58,25 @@ public class FetchRequest extends AbstractRequestResponse {
}
}
+ /**
+ * Create a non-replica fetch request
+ */
+ public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
+ this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
+ }
+
+ /**
+ * Create a replica fetch request
+ */
public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
struct.set(REPLICA_ID_KEY_NAME, replicaId);
struct.set(MAX_WAIT_KEY_NAME, maxWait);
struct.set(MIN_BYTES_KEY_NAME, minBytes);
List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
Struct topicData = struct.instance(TOPICS_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
List<Struct> partitionArray = new ArrayList<Struct>();
@@ -127,6 +135,6 @@ public class FetchRequest extends AbstractRequestResponse {
}
public static FetchRequest parse(ByteBuffer buffer) {
- return new FetchRequest(((Struct) curSchema.read(buffer)));
+ return new FetchRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f719010..c1e5f44 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -30,18 +30,19 @@ import java.util.List;
import java.util.Map;
public class FetchResponse extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
- private static String RESPONSES_KEY_NAME = "responses";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partition_responses";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
- private static String ERROR_CODE_KEY_NAME = "error_code";
- private static String HIGH_WATERMARK_KEY_NAME = "high_watermark";
- private static String RECORD_SET_KEY_NAME = "record_set";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
+ private static final String RECORD_SET_KEY_NAME = "record_set";
private final Map<TopicPartition, PartitionData> responseData;
@@ -58,7 +59,7 @@ public class FetchResponse extends AbstractRequestResponse {
}
public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<Struct>();
@@ -105,6 +106,6 @@ public class FetchResponse extends AbstractRequestResponse {
}
public static FetchResponse parse(ByteBuffer buffer) {
- return new FetchResponse(((Struct) curSchema.read(buffer)));
+ return new FetchResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 9512db2..cfdb5de 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -20,17 +20,18 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class HeartbeatRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
- private static String GROUP_ID_KEY_NAME = "group_id";
- private static String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
- private static String CONSUMER_ID_KEY_NAME = "consumer_id";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
private final String groupId;
private final int groupGenerationId;
private final String consumerId;
public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
struct.set(CONSUMER_ID_KEY_NAME, consumerId);
@@ -59,6 +60,6 @@ public class HeartbeatRequest extends AbstractRequestResponse {
}
public static HeartbeatRequest parse(ByteBuffer buffer) {
- return new HeartbeatRequest(((Struct) curSchema.read(buffer)));
+ return new HeartbeatRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 8997ffc..ea964f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -20,12 +20,13 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
public class HeartbeatResponse extends AbstractRequestResponse {
- private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
- private static String ERROR_CODE_KEY_NAME = "error_code";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
private final short errorCode;
public HeartbeatResponse(short errorCode) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
struct.set(ERROR_CODE_KEY_NAME, errorCode);
this.errorCode = errorCode;
}
@@ -40,6 +41,6 @@ public class HeartbeatResponse extends AbstractRequestResponse {
}
public static HeartbeatResponse parse(ByteBuffer buffer) {
- return new HeartbeatResponse(((Struct) curSchema.read(buffer)));
+ return new HeartbeatResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index d6e91f3..a1d48c9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -22,12 +22,13 @@ import java.util.ArrayList;
import java.util.List;
public class JoinGroupRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
- private static String GROUP_ID_KEY_NAME = "group_id";
- private static String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
- private static String TOPICS_KEY_NAME = "topics";
- private static String CONSUMER_ID_KEY_NAME = "consumer_id";
- private static String STRATEGY_KEY_NAME = "partition_assignment_strategy";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
+ private static final String TOPICS_KEY_NAME = "topics";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
private final String groupId;
private final int sessionTimeout;
@@ -36,7 +37,7 @@ public class JoinGroupRequest extends AbstractRequestResponse {
private final String strategy;
public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
struct.set(GROUP_ID_KEY_NAME, groupId);
struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
struct.set(TOPICS_KEY_NAME, topics.toArray());
@@ -82,6 +83,6 @@ public class JoinGroupRequest extends AbstractRequestResponse {
}
public static JoinGroupRequest parse(ByteBuffer buffer) {
- return new JoinGroupRequest(((Struct) curSchema.read(buffer)));
+ return new JoinGroupRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index efe8979..1e9f349 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -23,16 +23,17 @@ import java.nio.ByteBuffer;
import java.util.*;
public class JoinGroupResponse extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
- private static String ERROR_CODE_KEY_NAME = "error_code";
- private static String GENERATION_ID_KEY_NAME = "group_generation_id";
- private static String CONSUMER_ID_KEY_NAME = "consumer_id";
- private static String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partitions";
-
- public static int UNKNOWN_GENERATION_ID = -1;
- public static String UNKNOWN_CONSUMER_ID = "";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
+
+ public static final int UNKNOWN_GENERATION_ID = -1;
+ public static final String UNKNOWN_CONSUMER_ID = "";
private final short errorCode;
private final int generationId;
@@ -40,7 +41,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
private final List<TopicPartition> assignedPartitions;
public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
@@ -97,6 +98,6 @@ public class JoinGroupResponse extends AbstractRequestResponse {
}
public static JoinGroupResponse parse(ByteBuffer buffer) {
- return new JoinGroupResponse(((Struct) curSchema.read(buffer)));
+ return new JoinGroupResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 99364c1..05c5fed 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -30,18 +30,19 @@ import java.util.List;
import java.util.Map;
public class ListOffsetRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
- private static String REPLICA_ID_KEY_NAME = "replica_id";
- private static String TOPICS_KEY_NAME = "topics";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
+ private static final String REPLICA_ID_KEY_NAME = "replica_id";
+ private static final String TOPICS_KEY_NAME = "topics";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partitions";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
- private static String TIMESTAMP_KEY_NAME = "timestamp";
- private static String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String TIMESTAMP_KEY_NAME = "timestamp";
+ private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
private final int replicaId;
private final Map<TopicPartition, PartitionData> offsetData;
@@ -55,9 +56,13 @@ public class ListOffsetRequest extends AbstractRequestResponse {
this.maxNumOffsets = maxNumOffsets;
}
}
+
+ public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
+ this(-1, offsetData);
+ }
public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
struct.set(REPLICA_ID_KEY_NAME, replicaId);
@@ -109,6 +114,6 @@ public class ListOffsetRequest extends AbstractRequestResponse {
}
public static ListOffsetRequest parse(ByteBuffer buffer) {
- return new ListOffsetRequest(((Struct) curSchema.read(buffer)));
+ return new ListOffsetRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index ac23971..b2e473e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -30,17 +30,18 @@ import java.util.List;
import java.util.Map;
public class ListOffsetResponse extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
- private static String RESPONSES_KEY_NAME = "responses";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partition_responses";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
- private static String ERROR_CODE_KEY_NAME = "error_code";
- private static String OFFSETS_KEY_NAME = "offsets";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String OFFSETS_KEY_NAME = "offsets";
private final Map<TopicPartition, PartitionData> responseData;
@@ -55,7 +56,7 @@ public class ListOffsetResponse extends AbstractRequestResponse {
}
public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<Struct>();
@@ -103,6 +104,6 @@ public class ListOffsetResponse extends AbstractRequestResponse {
}
public static ListOffsetResponse parse(ByteBuffer buffer) {
- return new ListOffsetResponse(((Struct) curSchema.read(buffer)));
+ return new ListOffsetResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index b22ca1d..0186783 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -22,13 +22,14 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
public class MetadataRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
- private static String TOPICS_KEY_NAME = "topics";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
+ private static final String TOPICS_KEY_NAME = "topics";
private final List<String> topics;
public MetadataRequest(List<String> topics) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
struct.set(TOPICS_KEY_NAME, topics.toArray());
this.topics = topics;
}
@@ -47,6 +48,6 @@ public class MetadataRequest extends AbstractRequestResponse {
}
public static MetadataRequest parse(ByteBuffer buffer) {
- return new MetadataRequest(((Struct) curSchema.read(buffer)));
+ return new MetadataRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index d97962d..13daf59 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -28,32 +28,33 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
public class MetadataResponse extends AbstractRequestResponse {
- private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
- private static String BROKERS_KEY_NAME = "brokers";
- private static String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
+
+ private static Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
+ private static final String BROKERS_KEY_NAME = "brokers";
+ private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
// broker level field names
- private static String NODE_ID_KEY_NAME = "node_id";
- private static String HOST_KEY_NAME = "host";
- private static String PORT_KEY_NAME = "port";
+ private static final String NODE_ID_KEY_NAME = "node_id";
+ private static final String HOST_KEY_NAME = "host";
+ private static final String PORT_KEY_NAME = "port";
// topic level field names
- private static String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITION_METADATA_KEY_NAME = "partition_metadata";
+ private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
// partition level field names
- private static String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
- private static String PARTITION_KEY_NAME = "partition_id";
- private static String LEADER_KEY_NAME = "leader";
- private static String REPLICAS_KEY_NAME = "replicas";
- private static String ISR_KEY_NAME = "isr";
+ private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
+ private static final String PARTITION_KEY_NAME = "partition_id";
+ private static final String LEADER_KEY_NAME = "leader";
+ private static final String REPLICAS_KEY_NAME = "replicas";
+ private static final String ISR_KEY_NAME = "isr";
private final Cluster cluster;
private final Map<String, Errors> errors;
public MetadataResponse(Cluster cluster) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
List<Struct> brokerArray = new ArrayList<Struct>();
for (Node node: cluster.nodes()) {
@@ -147,6 +148,6 @@ public class MetadataResponse extends AbstractRequestResponse {
}
public static MetadataResponse parse(ByteBuffer buffer) {
- return new MetadataResponse(((Struct) curSchema.read(buffer)));
+ return new MetadataResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 3ee5cba..4fb48c8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -3,15 +3,21 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.requests;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
@@ -19,31 +25,26 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* This wrapper supports both v0 and v1 of OffsetCommitRequest.
*/
public class OffsetCommitRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
- private static String GROUP_ID_KEY_NAME = "group_id";
- private static String GENERATION_ID_KEY_NAME = "group_generation_id";
- private static String CONSUMER_ID_KEY_NAME = "consumer_id";
- private static String TOPICS_KEY_NAME = "topics";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
+ private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
+ private static final String TOPICS_KEY_NAME = "topics";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partitions";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
- private static String COMMIT_OFFSET_KEY_NAME = "offset";
- private static String TIMESTAMP_KEY_NAME = "timestamp";
- private static String METADATA_KEY_NAME = "metadata";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String COMMIT_OFFSET_KEY_NAME = "offset";
+ private static final String TIMESTAMP_KEY_NAME = "timestamp";
+ private static final String METADATA_KEY_NAME = "metadata";
public static final int DEFAULT_GENERATION_ID = -1;
public static final String DEFAULT_CONSUMER_ID = "";
@@ -88,7 +89,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
* @param offsetData
*/
public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
initCommonFields(groupId, offsetData);
struct.set(GENERATION_ID_KEY_NAME, generationId);
@@ -104,7 +105,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
struct.set(GROUP_ID_KEY_NAME, groupId);
List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
+ for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
Struct topicData = struct.instance(TOPICS_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
List<Struct> partitionArray = new ArrayList<Struct>();
@@ -175,6 +176,6 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
}
public static OffsetCommitRequest parse(ByteBuffer buffer) {
- return new OffsetCommitRequest(((Struct) curSchema.read(buffer)));
+ return new OffsetCommitRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 711232a..2ab1dc6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -26,21 +26,22 @@ import java.util.List;
import java.util.Map;
public class OffsetCommitResponse extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
- private static String RESPONSES_KEY_NAME = "responses";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
// topic level fields
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partition_responses";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
// partition level fields
- private static String PARTITION_KEY_NAME = "partition";
- private static String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
private final Map<TopicPartition, Short> responseData;
public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
@@ -82,6 +83,6 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
}
public static OffsetCommitResponse parse(ByteBuffer buffer) {
- return new OffsetCommitResponse(((Struct) curSchema.read(buffer)));
+ return new OffsetCommitResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 90d5135..333483f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -28,16 +28,17 @@ import java.util.Map;
* This wrapper supports both v0 and v1 of OffsetFetchRequest.
*/
public class OffsetFetchRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
- private static String GROUP_ID_KEY_NAME = "group_id";
- private static String TOPICS_KEY_NAME = "topics";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_FETCH.id);
+ private static final String GROUP_ID_KEY_NAME = "group_id";
+ private static final String TOPICS_KEY_NAME = "topics";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partitions";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partitions";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
+ private static final String PARTITION_KEY_NAME = "partition";
public static final int DEFAULT_GENERATION_ID = -1;
public static final String DEFAULT_CONSUMER_ID = "";
@@ -46,7 +47,7 @@ public class OffsetFetchRequest extends AbstractRequestResponse {
private final List<TopicPartition> partitions;
public OffsetFetchRequest(String groupId, List<TopicPartition> partitions) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
@@ -93,6 +94,6 @@ public class OffsetFetchRequest extends AbstractRequestResponse {
}
public static OffsetFetchRequest parse(ByteBuffer buffer) {
- return new OffsetFetchRequest(((Struct) curSchema.read(buffer)));
+ return new OffsetFetchRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 6b7c269..04c88c0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -3,43 +3,45 @@
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.common.requests;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class OffsetFetchResponse extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
- private static String RESPONSES_KEY_NAME = "responses";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
// topic level fields
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITIONS_KEY_NAME = "partition_responses";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITIONS_KEY_NAME = "partition_responses";
// partition level fields
- private static String PARTITION_KEY_NAME = "partition";
- private static String COMMIT_OFFSET_KEY_NAME = "offset";
- private static String METADATA_KEY_NAME = "metadata";
- private static String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String COMMIT_OFFSET_KEY_NAME = "offset";
+ private static final String METADATA_KEY_NAME = "metadata";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
- private final Map<TopicPartition,PartitionData> responseData;
+ private final Map<TopicPartition, PartitionData> responseData;
public static final class PartitionData {
public final long offset;
@@ -51,15 +53,19 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
this.metadata = metadata;
this.errorCode = errorCode;
}
+
+ public boolean hasError() {
+ return this.errorCode != Errors.NONE.code();
+ }
}
public OffsetFetchResponse(Map<TopicPartition, PartitionData> responseData) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
List<Struct> topicArray = new ArrayList<Struct>();
- for (Map.Entry<String, Map<Integer, PartitionData>> entries: topicsData.entrySet()) {
+ for (Map.Entry<String, Map<Integer, PartitionData>> entries : topicsData.entrySet()) {
Struct topicData = struct.instance(RESPONSES_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, entries.getKey());
List<Struct> partitionArray = new ArrayList<Struct>();
@@ -102,6 +108,6 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
}
public static OffsetFetchResponse parse(ByteBuffer buffer) {
- return new OffsetFetchResponse(((Struct) curSchema.read(buffer)));
+ return new OffsetFetchResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 3dbba8a..03a0ab1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -27,25 +27,26 @@ import java.util.List;
import java.util.Map;
public class ProduceRequest extends AbstractRequestResponse {
- public static Schema curSchema = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
- private static String ACKS_KEY_NAME = "acks";
- private static String TIMEOUT_KEY_NAME = "timeout";
- private static String TOPIC_DATA_KEY_NAME = "topic_data";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.PRODUCE.id);
+ private static final String ACKS_KEY_NAME = "acks";
+ private static final String TIMEOUT_KEY_NAME = "timeout";
+ private static final String TOPIC_DATA_KEY_NAME = "topic_data";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITION_DATA_KEY_NAME = "data";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_DATA_KEY_NAME = "data";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
- private static String RECORD_SET_KEY_NAME = "record_set";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String RECORD_SET_KEY_NAME = "record_set";
private final short acks;
private final int timeout;
private final Map<TopicPartition, ByteBuffer> partitionRecords;
public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
struct.set(ACKS_KEY_NAME, acks);
struct.set(TIMEOUT_KEY_NAME, timeout);
@@ -100,6 +101,6 @@ public class ProduceRequest extends AbstractRequestResponse {
}
public static ProduceRequest parse(ByteBuffer buffer) {
- return new ProduceRequest(((Struct) curSchema.read(buffer)));
+ return new ProduceRequest(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 5220464..e42d7db 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -26,22 +26,23 @@ import java.util.List;
import java.util.Map;
public class ProduceResponse extends AbstractRequestResponse {
- private static Schema curSchema = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
- private static String RESPONSES_KEY_NAME = "responses";
+
+ private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
+ private static final String RESPONSES_KEY_NAME = "responses";
// topic level field names
- private static String TOPIC_KEY_NAME = "topic";
- private static String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
+ private static final String TOPIC_KEY_NAME = "topic";
+ private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses";
// partition level field names
- private static String PARTITION_KEY_NAME = "partition";
- private static String ERROR_CODE_KEY_NAME = "error_code";
- private static String BASE_OFFSET_KEY_NAME = "base_offset";
+ private static final String PARTITION_KEY_NAME = "partition";
+ private static final String ERROR_CODE_KEY_NAME = "error_code";
+ private static final String BASE_OFFSET_KEY_NAME = "base_offset";
private final Map<TopicPartition, PartitionResponse> responses;
public ProduceResponse(Map<TopicPartition, PartitionResponse> responses) {
- super(new Struct(curSchema));
+ super(new Struct(CURRENT_SCHEMA));
Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
@@ -107,6 +108,6 @@ public class ProduceResponse extends AbstractRequestResponse {
}
public static ProduceResponse parse(ByteBuffer buffer) {
- return new ProduceResponse(((Struct) curSchema.read(buffer)));
+ return new ProduceResponse(((Struct) CURRENT_SCHEMA.read(buffer)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index 3c001d3..13be6a3 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -29,7 +29,7 @@ public interface Deserializer<T> {
* @param isKey whether is for key or value
*/
public void configure(Map<String, ?> configs, boolean isKey);
-
+
/**
*
* @param topic topic associated with the data
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index 527dd0f..8a305b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -17,6 +17,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -224,6 +227,18 @@ public class Utils {
}
/**
+ * Sleep for a bit
+ * @param ms The duration of the sleep
+ */
+ public static void sleep(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // this is okay, we just wake up early
+ }
+ }
+
+ /**
* Instantiate the class
*/
public static Object newInstance(Class<?> c) {
@@ -313,4 +328,31 @@ public class Utils {
? "[" + host + "]:" + port // IPv6
: host + ":" + port;
}
+
+ /**
+ * Create a string representation of an array joined by the given separator
+ * @param strs The array of items
+ * @param seperator The separator
+ * @return The string representation.
+ */
+ public static <T> String join(T[] strs, String seperator) {
+ return join(Arrays.asList(strs), seperator);
+ }
+
+ /**
+ * Create a string representation of a list joined by the given separator
+ * @param list The list of items
+ * @param seperator The separator
+ * @return The string representation.
+ */
+ public static <T> String join(Collection<T> list, String seperator) {
+ StringBuilder sb = new StringBuilder();
+ Iterator<T> iter = list.iterator();
+ while(iter.hasNext()) {
+ sb.append(iter.next());
+ if(iter.hasNext())
+ sb.append(seperator);
+ }
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 47b5d4a..67bee40 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -59,13 +59,33 @@ public class MockClient implements KafkaClient {
}
@Override
- public List<ClientResponse> poll(List<ClientRequest> requests, long timeoutMs, long now) {
- this.requests.addAll(requests);
- List<ClientResponse> copy = new ArrayList<ClientResponse>(this.responses);
+ public void send(ClientRequest request) {
+ this.requests.add(request);
+ }
+
+ @Override
+ public List<ClientResponse> poll(long timeoutMs, long now) {
+ for(ClientResponse response: this.responses)
+ if (response.request().hasCallback())
+ response.request().callback().onComplete(response);
+ List<ClientResponse> copy = new ArrayList<ClientResponse>();
this.responses.clear();
return copy;
}
+ @Override
+ public List<ClientResponse> completeAll(int node, long now) {
+ return completeAll(now);
+ }
+
+ @Override
+ public List<ClientResponse> completeAll(long now) {
+ List<ClientResponse> responses = poll(0, now);
+ if (requests.size() > 0)
+ throw new IllegalStateException("Requests without responses remain.");
+ return responses;
+ }
+
public Queue<ClientRequest> requests() {
return this.requests;
}
@@ -81,6 +101,11 @@ public class MockClient implements KafkaClient {
}
@Override
+ public int inFlightRequestCount(int nodeId) {
+ return requests.size();
+ }
+
+ @Override
public RequestHeader nextRequestHeader(ApiKeys key) {
return new RequestHeader(key.id, "mock", correlation++);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0699ff2c/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 1a55242..5debcd6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -5,7 +5,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -46,14 +45,13 @@ public class NetworkClientTest {
@Test
public void testReadyAndDisconnect() {
- List<ClientRequest> reqs = new ArrayList<ClientRequest>();
assertFalse("Client begins unready as it has no connection.", client.ready(node, time.milliseconds()));
assertEquals("The connection is established as a side-effect of the readiness check", 1, selector.connected().size());
- client.poll(reqs, 1, time.milliseconds());
+ client.poll(1, time.milliseconds());
selector.clear();
assertTrue("Now the client is ready", client.ready(node, time.milliseconds()));
selector.disconnect(node.id());
- client.poll(reqs, 1, time.milliseconds());
+ client.poll(1, time.milliseconds());
selector.clear();
assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds()));
assertTrue("Metadata should get updated.", metadata.timeToNextUpdate(time.milliseconds()) == 0);
@@ -65,7 +63,8 @@ public class NetworkClientTest {
client.nextRequestHeader(ApiKeys.METADATA),
new MetadataRequest(Arrays.asList("test")).toStruct());
ClientRequest request = new ClientRequest(time.milliseconds(), false, send, null);
- client.poll(Arrays.asList(request), 1, time.milliseconds());
+ client.send(request);
+ client.poll(1, time.milliseconds());
}
@Test
@@ -73,9 +72,11 @@ public class NetworkClientTest {
ProduceRequest produceRequest = new ProduceRequest((short) 1, 1000, Collections.<TopicPartition, ByteBuffer>emptyMap());
RequestHeader reqHeader = client.nextRequestHeader(ApiKeys.PRODUCE);
RequestSend send = new RequestSend(node.id(), reqHeader, produceRequest.toStruct());
- ClientRequest request = new ClientRequest(time.milliseconds(), true, send, null);
+ TestCallbackHandler handler = new TestCallbackHandler();
+ ClientRequest request = new ClientRequest(time.milliseconds(), true, send, handler);
awaitReady(client, node);
- client.poll(Arrays.asList(request), 1, time.milliseconds());
+ client.send(request);
+ client.poll(1, time.milliseconds());
assertEquals(1, client.inFlightRequestCount());
ResponseHeader respHeader = new ResponseHeader(reqHeader.correlationId());
Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
@@ -86,16 +87,26 @@ public class NetworkClientTest {
resp.writeTo(buffer);
buffer.flip();
selector.completeReceive(new NetworkReceive(node.id(), buffer));
- List<ClientResponse> responses = client.poll(new ArrayList<ClientRequest>(), 1, time.milliseconds());
+ List<ClientResponse> responses = client.poll(1, time.milliseconds());
assertEquals(1, responses.size());
- ClientResponse response = responses.get(0);
- assertTrue("Should have a response body.", response.hasResponse());
- assertEquals("Should be correlated to the original request", request, response.request());
+ assertTrue("The handler should have executed.", handler.executed);
+ assertTrue("Should have a response body.", handler.response.hasResponse());
+ assertEquals("Should be correlated to the original request", request, handler.response.request());
}
private void awaitReady(NetworkClient client, Node node) {
while (!client.ready(node, time.milliseconds()))
- client.poll(new ArrayList<ClientRequest>(), 1, time.milliseconds());
+ client.poll(1, time.milliseconds());
+ }
+
+ private static class TestCallbackHandler implements RequestCompletionHandler {
+ public boolean executed = false;
+ public ClientResponse response;
+
+ public void onComplete(ClientResponse response) {
+ this.executed = true;
+ this.response = response;
+ }
}
}