You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2016/11/15 00:46:30 UTC
[4/5] kafka git commit: KAFKA-2066;
Use client-side FetchRequest/FetchResponse on server
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
new file mode 100644
index 0000000..4a4d569
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogInputStream.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import java.io.IOException;
+
+/**
+ * An abstraction between an underlying input stream and record iterators, a LogInputStream
+ * returns only the shallow log entries, depending on {@link org.apache.kafka.common.record.RecordsIterator.DeepRecordsIterator}
+ * for the deep iteration.
+ */
+interface LogInputStream {
+
+ /**
+ * Get the next log entry from the underlying input stream.
+ *
+ * @return The next log entry or null if there is none
+ * @throws IOException for any IO errors
+ */
+ LogEntry nextEntry() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 3848ea9..65ccf98 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -13,21 +13,18 @@
package org.apache.kafka.common.record;
import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
+import java.nio.channels.GatheringByteChannel;
import java.util.Iterator;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.AbstractIterator;
-import org.apache.kafka.common.utils.Utils;
-
/**
* A {@link Records} implementation backed by a ByteBuffer.
*/
public class MemoryRecords implements Records {
+ public final static MemoryRecords EMPTY = MemoryRecords.readableRecords(ByteBuffer.allocate(0));
+
private final static int WRITE_LIMIT_FOR_READABLE_ONLY = -1;
// the compressor used for appends-only
@@ -148,6 +145,7 @@ public class MemoryRecords implements Records {
/**
* The size of this record set
*/
+ @Override
public int sizeInBytes() {
if (writable) {
return compressor.buffer().position();
@@ -156,6 +154,15 @@ public class MemoryRecords implements Records {
}
}
+ @Override
+ public long writeTo(GatheringByteChannel channel, long offset, int length) throws IOException {
+ ByteBuffer dup = buffer.duplicate();
+ int position = (int) offset;
+ dup.position(position);
+ dup.limit(position + length);
+ return channel.write(dup);
+ }
+
/**
* The compression rate of this record set
*/
@@ -186,13 +193,11 @@ public class MemoryRecords implements Records {
@Override
public Iterator<LogEntry> iterator() {
- if (writable) {
+ ByteBuffer input = this.buffer.duplicate();
+ if (writable)
// flip on a duplicate buffer for reading
- return new RecordsIterator((ByteBuffer) this.buffer.duplicate().flip(), false);
- } else {
- // do not need to flip for non-writable buffer
- return new RecordsIterator(this.buffer.duplicate(), false);
- }
+ input.flip();
+ return new RecordsIterator(new ByteBufferLogInputStream(input), false);
}
@Override
@@ -219,151 +224,44 @@ public class MemoryRecords implements Records {
return writable;
}
- public static class RecordsIterator extends AbstractIterator<LogEntry> {
- private final ByteBuffer buffer;
- private final DataInputStream stream;
- private final CompressionType type;
- private final boolean shallow;
- private RecordsIterator innerIter;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
- // The variables for inner iterator
- private final ArrayDeque<LogEntry> logEntries;
- private final long absoluteBaseOffset;
+ MemoryRecords that = (MemoryRecords) o;
- public RecordsIterator(ByteBuffer buffer, boolean shallow) {
- this.type = CompressionType.NONE;
- this.buffer = buffer;
- this.shallow = shallow;
- this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
- this.logEntries = null;
- this.absoluteBaseOffset = -1;
- }
+ return buffer.equals(that.buffer);
- // Private constructor for inner iterator.
- private RecordsIterator(LogEntry entry) {
- this.type = entry.record().compressionType();
- this.buffer = entry.record().value();
- this.shallow = true;
- this.stream = Compressor.wrapForInput(new ByteBufferInputStream(this.buffer), type, entry.record().magic());
- long wrapperRecordOffset = entry.offset();
-
- long wrapperRecordTimestamp = entry.record().timestamp();
- this.logEntries = new ArrayDeque<>();
- // If relative offset is used, we need to decompress the entire message first to compute
- // the absolute offset. For simplicity and because it's a format that is on its way out, we
- // do the same for message format version 0
- try {
- while (true) {
- try {
- LogEntry logEntry = getNextEntryFromStream();
- if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
- Record recordWithTimestamp = new Record(
- logEntry.record().buffer(),
- wrapperRecordTimestamp,
- entry.record().timestampType()
- );
- logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
- }
- logEntries.add(logEntry);
- } catch (EOFException e) {
- break;
- }
- }
- if (entry.record().magic() > Record.MAGIC_VALUE_V0)
- this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
- else
- this.absoluteBaseOffset = -1;
- } catch (IOException e) {
- throw new KafkaException(e);
- } finally {
- Utils.closeQuietly(stream, "records iterator stream");
- }
- }
+ }
- /*
- * Read the next record from the buffer.
- *
- * Note that in the compressed message set, each message value size is set as the size of the un-compressed
- * version of the message value, so when we do de-compression allocating an array of the specified size for
- * reading compressed value data is sufficient.
- */
- @Override
- protected LogEntry makeNext() {
- if (innerDone()) {
- try {
- LogEntry entry = getNextEntry();
- // No more record to return.
- if (entry == null)
- return allDone();
-
- // Convert offset to absolute offset if needed.
- if (absoluteBaseOffset >= 0) {
- long absoluteOffset = absoluteBaseOffset + entry.offset();
- entry = new LogEntry(absoluteOffset, entry.record());
- }
-
- // decide whether to go shallow or deep iteration if it is compressed
- CompressionType compression = entry.record().compressionType();
- if (compression == CompressionType.NONE || shallow) {
- return entry;
- } else {
- // init the inner iterator with the value payload of the message,
- // which will de-compress the payload to a set of messages;
- // since we assume nested compression is not allowed, the deep iterator
- // would not try to further decompress underlying messages
- // There will be at least one element in the inner iterator, so we don't
- // need to call hasNext() here.
- innerIter = new RecordsIterator(entry);
- return innerIter.next();
- }
- } catch (EOFException e) {
- return allDone();
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- } else {
- return innerIter.next();
- }
- }
+ @Override
+ public int hashCode() {
+ return buffer.hashCode();
+ }
- private LogEntry getNextEntry() throws IOException {
- if (logEntries != null)
- return getNextEntryFromEntryList();
- else
- return getNextEntryFromStream();
- }
+ private static class ByteBufferLogInputStream implements LogInputStream {
+ private final DataInputStream stream;
+ private final ByteBuffer buffer;
- private LogEntry getNextEntryFromEntryList() {
- return logEntries.isEmpty() ? null : logEntries.remove();
+ private ByteBufferLogInputStream(ByteBuffer buffer) {
+ this.stream = new DataInputStream(new ByteBufferInputStream(buffer));
+ this.buffer = buffer;
}
- private LogEntry getNextEntryFromStream() throws IOException {
- // read the offset
+ public LogEntry nextEntry() throws IOException {
long offset = stream.readLong();
- // read record size
int size = stream.readInt();
if (size < 0)
throw new IllegalStateException("Record with size " + size);
- // read the record, if compression is used we cannot depend on size
- // and hence has to do extra copy
- ByteBuffer rec;
- if (type == CompressionType.NONE) {
- rec = buffer.slice();
- int newPos = buffer.position() + size;
- if (newPos > buffer.limit())
- return null;
- buffer.position(newPos);
- rec.limit(size);
- } else {
- byte[] recordBuffer = new byte[size];
- stream.readFully(recordBuffer, 0, size);
- rec = ByteBuffer.wrap(recordBuffer);
- }
- return new LogEntry(offset, new Record(rec));
- }
- private boolean innerDone() {
- return innerIter == null || !innerIter.hasNext();
+ ByteBuffer slice = buffer.slice();
+ int newPos = buffer.position() + size;
+ if (newPos > buffer.limit())
+ return null;
+ buffer.position(newPos);
+ slice.limit(size);
+ return new LogEntry(offset, new Record(slice));
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index d43cdab..3bc043f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.common.record;
+import java.io.IOException;
+import java.nio.channels.GatheringByteChannel;
+
/**
* A binary format which consists of a 4 byte size, an 8 byte offset, and the record bytes. See {@link MemoryRecords}
* for the in-memory representation.
@@ -28,7 +31,19 @@ public interface Records extends Iterable<LogEntry> {
/**
* The size of these records in bytes
+ * @return The size in bytes
*/
int sizeInBytes();
+ /**
+ * Write the messages in this set to the given channel starting at the given offset byte.
+ * @param channel The channel to write to
+ * @param position The position within this record set to begin writing from
+ * @param length The number of bytes to write
+ * @return The number of bytes written to the channel (which may be fewer than requested)
+ * @throws IOException For any IO errors copying the
+ */
+ long writeTo(GatheringByteChannel channel, long position, int length) throws IOException;
+
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
new file mode 100644
index 0000000..1bc8a65
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+
+public class RecordsIterator extends AbstractIterator<LogEntry> {
+ private final LogInputStream logStream;
+ private final boolean shallow;
+ private DeepRecordsIterator innerIter;
+
+ public RecordsIterator(LogInputStream logStream, boolean shallow) {
+ this.logStream = logStream;
+ this.shallow = shallow;
+ }
+
+ /*
+ * Read the next record from the buffer.
+ *
+ * Note that in the compressed message set, each message value size is set as the size of the un-compressed
+ * version of the message value, so when we do de-compression allocating an array of the specified size for
+ * reading compressed value data is sufficient.
+ */
+ @Override
+ protected LogEntry makeNext() {
+ if (innerDone()) {
+ try {
+ LogEntry entry = logStream.nextEntry();
+ // No more record to return.
+ if (entry == null)
+ return allDone();
+
+ // decide whether to go shallow or deep iteration if it is compressed
+ CompressionType compressionType = entry.record().compressionType();
+ if (compressionType == CompressionType.NONE || shallow) {
+ return entry;
+ } else {
+ // init the inner iterator with the value payload of the message,
+ // which will de-compress the payload to a set of messages;
+ // since we assume nested compression is not allowed, the deep iterator
+ // would not try to further decompress underlying messages
+ // There will be at least one element in the inner iterator, so we don't
+ // need to call hasNext() here.
+ innerIter = new DeepRecordsIterator(entry);
+ return innerIter.next();
+ }
+ } catch (EOFException e) {
+ return allDone();
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
+ } else {
+ return innerIter.next();
+ }
+ }
+
+ private boolean innerDone() {
+ return innerIter == null || !innerIter.hasNext();
+ }
+
+ private static class DataLogInputStream implements LogInputStream {
+ private final DataInputStream stream;
+
+ private DataLogInputStream(DataInputStream stream) {
+ this.stream = stream;
+ }
+
+ public LogEntry nextEntry() throws IOException {
+ long offset = stream.readLong();
+ int size = stream.readInt();
+ if (size < 0)
+ throw new IllegalStateException("Record with size " + size);
+
+ byte[] recordBuffer = new byte[size];
+ stream.readFully(recordBuffer, 0, size);
+ ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
+ return new LogEntry(offset, new Record(buf));
+ }
+ }
+
+ private static class DeepRecordsIterator extends AbstractIterator<LogEntry> {
+ private final ArrayDeque<LogEntry> logEntries;
+ private final long absoluteBaseOffset;
+
+ private DeepRecordsIterator(LogEntry entry) {
+ CompressionType compressionType = entry.record().compressionType();
+ ByteBuffer buffer = entry.record().value();
+ DataInputStream stream = Compressor.wrapForInput(new ByteBufferInputStream(buffer), compressionType, entry.record().magic());
+ LogInputStream logStream = new DataLogInputStream(stream);
+
+ long wrapperRecordOffset = entry.offset();
+ long wrapperRecordTimestamp = entry.record().timestamp();
+ this.logEntries = new ArrayDeque<>();
+
+ // If relative offset is used, we need to decompress the entire message first to compute
+ // the absolute offset. For simplicity and because it's a format that is on its way out, we
+ // do the same for message format version 0
+ try {
+ while (true) {
+ try {
+ LogEntry logEntry = logStream.nextEntry();
+ if (entry.record().magic() > Record.MAGIC_VALUE_V0) {
+ Record recordWithTimestamp = new Record(
+ logEntry.record().buffer(),
+ wrapperRecordTimestamp,
+ entry.record().timestampType()
+ );
+ logEntry = new LogEntry(logEntry.offset(), recordWithTimestamp);
+ }
+ logEntries.add(logEntry);
+ } catch (EOFException e) {
+ break;
+ }
+ }
+ if (entry.record().magic() > Record.MAGIC_VALUE_V0)
+ this.absoluteBaseOffset = wrapperRecordOffset - logEntries.getLast().offset();
+ else
+ this.absoluteBaseOffset = -1;
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ } finally {
+ Utils.closeQuietly(stream, "records iterator stream");
+ }
+ }
+
+ @Override
+ protected LogEntry makeNext() {
+ if (logEntries.isEmpty())
+ return allDone();
+
+ LogEntry entry = logEntries.remove();
+
+ // Convert offset to absolute offset if needed.
+ if (absoluteBaseOffset >= 0) {
+ long absoluteOffset = absoluteBaseOffset + entry.offset();
+ entry = new LogEntry(absoluteOffset, entry.record());
+ }
+
+ // decide whether to go shallow or deep iteration if it is compressed
+ CompressionType compression = entry.record().compressionType();
+ if (compression != CompressionType.NONE)
+ throw new InvalidRecordException("Inner messages must not be compressed");
+
+ return entry;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index e6febe5..80182e6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.types.Struct;
@@ -27,10 +29,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
super(struct);
}
+ public Send toSend(String destination, RequestHeader header) {
+ return new NetworkSend(destination, serialize(header, this));
+ }
+
/**
* Get an error response for a request for a given api version
*/
- public abstract AbstractRequestResponse getErrorResponse(int versionId, Throwable e);
+ public abstract AbstractResponse getErrorResponse(int versionId, Throwable e);
/**
* Factory method for getting a request object based on ApiKey ID and a buffer
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
index 37aff6c..3ad16a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
@@ -19,7 +19,6 @@ import java.nio.ByteBuffer;
public abstract class AbstractRequestResponse {
protected final Struct struct;
-
public AbstractRequestResponse(Struct struct) {
this.struct = struct;
}
@@ -63,4 +62,12 @@ public abstract class AbstractRequestResponse {
AbstractRequestResponse other = (AbstractRequestResponse) obj;
return struct.equals(other.struct);
}
+
+ public static ByteBuffer serialize(AbstractRequestResponse header, AbstractRequestResponse body) {
+ ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
+ header.writeTo(buffer);
+ body.writeTo(buffer);
+ buffer.rewind();
+ return buffer;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
new file mode 100644
index 0000000..8bbc25a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+public abstract class AbstractResponse extends AbstractRequestResponse {
+
+ public AbstractResponse(Struct struct) {
+ super(struct);
+ }
+
+ public Send toSend(String destination, RequestHeader request) {
+ ResponseHeader responseHeader = new ResponseHeader(request.correlationId());
+ return new NetworkSend(destination, serialize(responseHeader, this));
+ }
+
+ public static AbstractResponse getResponse(int requestId, Struct struct) {
+ ApiKeys apiKey = ApiKeys.forId(requestId);
+ switch (apiKey) {
+ case PRODUCE:
+ return new ProduceResponse(struct);
+ case FETCH:
+ return new FetchResponse(struct);
+ case LIST_OFFSETS:
+ return new ListOffsetResponse(struct);
+ case METADATA:
+ return new MetadataResponse(struct);
+ case OFFSET_COMMIT:
+ return new OffsetCommitResponse(struct);
+ case OFFSET_FETCH:
+ return new OffsetFetchResponse(struct);
+ case GROUP_COORDINATOR:
+ return new GroupCoordinatorResponse(struct);
+ case JOIN_GROUP:
+ return new JoinGroupResponse(struct);
+ case HEARTBEAT:
+ return new HeartbeatResponse(struct);
+ case LEAVE_GROUP:
+ return new LeaveGroupResponse(struct);
+ case SYNC_GROUP:
+ return new SyncGroupResponse(struct);
+ case STOP_REPLICA:
+ return new StopReplicaResponse(struct);
+ case CONTROLLED_SHUTDOWN_KEY:
+ return new ControlledShutdownResponse(struct);
+ case UPDATE_METADATA_KEY:
+ return new UpdateMetadataResponse(struct);
+ case LEADER_AND_ISR:
+ return new LeaderAndIsrResponse(struct);
+ case DESCRIBE_GROUPS:
+ return new DescribeGroupsResponse(struct);
+ case LIST_GROUPS:
+ return new ListGroupsResponse(struct);
+ case SASL_HANDSHAKE:
+ return new SaslHandshakeResponse(struct);
+ case API_VERSIONS:
+ return new ApiVersionsResponse(struct);
+ case CREATE_TOPICS:
+ return new CreateTopicsResponse(struct);
+ case DELETE_TOPICS:
+ return new DeleteTopicsResponse(struct);
+ default:
+ throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
+ "code should be updated to do so.", apiKey));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index b78c759..d9ef37e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -34,7 +34,7 @@ public class ApiVersionsRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
short errorCode = Errors.forException(e).code();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index fe995b2..0bf1039 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -27,7 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ApiVersionsResponse extends AbstractRequestResponse {
+public class ApiVersionsResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
private static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 9ac127d..c2ace32 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -42,7 +42,7 @@ public class ControlledShutdownRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
throw new IllegalArgumentException(String.format("Version 0 is not supported. It is only supported by " +
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 862264f..1996f82 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-public class ControlledShutdownResponse extends AbstractRequestResponse {
+public class ControlledShutdownResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 3977835..7c440dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -194,7 +194,7 @@ public class CreateTopicsRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
Map<String, Errors> topicErrors = new HashMap<>();
for (String topic : topics.keySet()) {
topicErrors.put(topic, Errors.forException(e));
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index da8c3ce..303b779 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class CreateTopicsResponse extends AbstractRequestResponse {
+public class CreateTopicsResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CREATE_TOPICS.id);
private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index f78c428..0632cc0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -58,7 +58,7 @@ public class DeleteTopicsRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
Map<String, Errors> topicErrors = new HashMap<>();
for (String topic : topics)
topicErrors.put(topic, Errors.forException(e));
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index e474feb..ed6a63d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -28,7 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class DeleteTopicsResponse extends AbstractRequestResponse {
+public class DeleteTopicsResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DELETE_TOPICS.id);
private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes";
private static final String TOPIC_KEY_NAME = "topic";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index a870b8f..b965c91 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -46,7 +46,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 2d4faee..2eff628 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class DescribeGroupsResponse extends AbstractRequestResponse {
+public class DescribeGroupsResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index dcdfd9c..fd4c747 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
public class FetchRequest extends AbstractRequest {
@@ -80,7 +81,7 @@ public class FetchRequest extends AbstractRequest {
int partition = topicEntry.getKey().partition();
T partitionData = topicEntry.getValue();
if (topics.isEmpty() || !topics.get(topics.size() - 1).topic.equals(topic))
- topics.add(new TopicAndPartitionData(topic));
+ topics.add(new TopicAndPartitionData<T>(topic));
topics.get(topics.size() - 1).partitions.put(partition, partitionData);
}
return topics;
@@ -131,11 +132,11 @@ public class FetchRequest extends AbstractRequest {
struct.set(MIN_BYTES_KEY_NAME, minBytes);
if (version >= 3)
struct.set(MAX_BYTES_KEY_NAME, maxBytes);
- List<Struct> topicArray = new ArrayList<Struct>();
+ List<Struct> topicArray = new ArrayList<>();
for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
Struct topicData = struct.instance(TOPICS_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
- List<Struct> partitionArray = new ArrayList<Struct>();
+ List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
PartitionData fetchPartitionData = partitionEntry.getValue();
Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
@@ -180,13 +181,13 @@ public class FetchRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
- FetchResponse.INVALID_HIGHWATERMARK,
- FetchResponse.EMPTY_RECORD_SET);
+ FetchResponse.INVALID_HIGHWATERMARK, MemoryRecords.EMPTY);
+
responseData.put(entry.getKey(), partitionResponse);
}
@@ -223,6 +224,10 @@ public class FetchRequest extends AbstractRequest {
return fetchData;
}
+ public boolean isFromFollower() {
+ return replicaId >= 0;
+ }
+
public static FetchRequest parse(ByteBuffer buffer, int versionId) {
return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 111d197..ec2ab47 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -17,10 +17,15 @@
package org.apache.kafka.common.requests;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.MultiSend;
+import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.record.Records;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -29,9 +34,9 @@ import java.util.List;
import java.util.Map;
/**
- * This wrapper supports both v0 and v1 of FetchResponse.
+ * This wrapper supports all versions of the Fetch API
*/
-public class FetchResponse extends AbstractRequestResponse {
+public class FetchResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
private static final String RESPONSES_KEY_NAME = "responses";
@@ -42,14 +47,15 @@ public class FetchResponse extends AbstractRequestResponse {
private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
// partition level field names
+ private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
private static final String PARTITION_KEY_NAME = "partition";
private static final String ERROR_CODE_KEY_NAME = "error_code";
// Default throttle time
private static final int DEFAULT_THROTTLE_TIME = 0;
- /**
- * Possible error code:
+ /**
+ * Possible error codes:
*
* OFFSET_OUT_OF_RANGE (1)
* UNKNOWN_TOPIC_OR_PARTITION (3)
@@ -62,7 +68,6 @@ public class FetchResponse extends AbstractRequestResponse {
private static final String RECORD_SET_KEY_NAME = "record_set";
public static final long INVALID_HIGHWATERMARK = -1L;
- public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
private final LinkedHashMap<TopicPartition, PartitionData> responseData;
private final int throttleTime;
@@ -70,12 +75,12 @@ public class FetchResponse extends AbstractRequestResponse {
public static final class PartitionData {
public final short errorCode;
public final long highWatermark;
- public final ByteBuffer recordSet;
+ public final Records records;
- public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
+ public PartitionData(short errorCode, long highWatermark, Records records) {
this.errorCode = errorCode;
this.highWatermark = highWatermark;
- this.recordSet = recordSet;
+ this.records = records;
}
}
@@ -106,32 +111,9 @@ public class FetchResponse extends AbstractRequestResponse {
this(3, responseData, throttleTime);
}
- private FetchResponse(int version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
+ public FetchResponse(int version, LinkedHashMap<TopicPartition, PartitionData> responseData, int throttleTime) {
super(new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version)));
-
- List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
- List<Struct> topicArray = new ArrayList<>();
- for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
- Struct topicData = struct.instance(RESPONSES_KEY_NAME);
- topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
- List<Struct> partitionArray = new ArrayList<>();
- for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
- PartitionData fetchPartitionData = partitionEntry.getValue();
- Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
- partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
- partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
- partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
- partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
- partitionArray.add(partitionData);
- }
- topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
- topicArray.add(topicData);
- }
- struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-
- if (version >= 1)
- struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
-
+ writeStruct(struct, version, responseData, throttleTime);
this.responseData = responseData;
this.throttleTime = throttleTime;
}
@@ -144,11 +126,12 @@ public class FetchResponse extends AbstractRequestResponse {
String topic = topicResponse.getString(TOPIC_KEY_NAME);
for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
Struct partitionResponse = (Struct) partitionResponseObj;
- int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
- long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
- ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
- PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
+ Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
+ int partition = partitionResponseHeader.getInt(PARTITION_KEY_NAME);
+ short errorCode = partitionResponseHeader.getShort(ERROR_CODE_KEY_NAME);
+ long highWatermark = partitionResponseHeader.getLong(HIGH_WATERMARK_KEY_NAME);
+ Records records = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
+ PartitionData partitionData = new PartitionData(errorCode, highWatermark, records);
responseData.put(new TopicPartition(topic, partition), partitionData);
}
}
@@ -156,6 +139,22 @@ public class FetchResponse extends AbstractRequestResponse {
this.throttleTime = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
}
+ @Override
+ public Send toSend(String dest, RequestHeader requestHeader) {
+ ResponseHeader responseHeader = new ResponseHeader(requestHeader.correlationId());
+
+ // write the total size and the response header
+ ByteBuffer buffer = ByteBuffer.allocate(responseHeader.sizeOf() + 4);
+ buffer.putInt(responseHeader.sizeOf() + struct.sizeOf());
+ responseHeader.writeTo(buffer);
+ buffer.rewind();
+
+ List<Send> sends = new ArrayList<>();
+ sends.add(new ByteBufferSend(dest, buffer));
+ addResponseData(dest, sends);
+ return new MultiSend(dest, sends);
+ }
+
public LinkedHashMap<TopicPartition, PartitionData> responseData() {
return responseData;
}
@@ -171,4 +170,92 @@ public class FetchResponse extends AbstractRequestResponse {
public static FetchResponse parse(ByteBuffer buffer, int version) {
return new FetchResponse(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version).read(buffer));
}
+
+ private void addResponseData(String dest, List<Send> sends) {
+ Object[] allTopicData = struct.getArray(RESPONSES_KEY_NAME);
+
+ if (struct.hasField(THROTTLE_TIME_KEY_NAME)) {
+ int throttleTime = struct.getInt(THROTTLE_TIME_KEY_NAME);
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.putInt(throttleTime);
+ buffer.putInt(allTopicData.length);
+ buffer.rewind();
+ sends.add(new ByteBufferSend(dest, buffer));
+ } else {
+ ByteBuffer buffer = ByteBuffer.allocate(4);
+ buffer.putInt(allTopicData.length);
+ buffer.rewind();
+ sends.add(new ByteBufferSend(dest, buffer));
+ }
+
+ for (Object topicData : allTopicData)
+ addTopicData(dest, sends, (Struct) topicData);
+ }
+
+ private void addTopicData(String dest, List<Send> sends, Struct topicData) {
+ String topic = topicData.getString(TOPIC_KEY_NAME);
+ Object[] allPartitionData = topicData.getArray(PARTITIONS_KEY_NAME);
+
+ // include the topic header and the count for the number of partitions
+ ByteBuffer buffer = ByteBuffer.allocate(Type.STRING.sizeOf(topic) + 4);
+ Type.STRING.write(buffer, topic);
+ buffer.putInt(allPartitionData.length);
+ buffer.rewind();
+ sends.add(new ByteBufferSend(dest, buffer));
+
+ for (Object partitionData : allPartitionData)
+ addPartitionData(dest, sends, (Struct) partitionData);
+ }
+
+ private void addPartitionData(String dest, List<Send> sends, Struct partitionData) {
+ Struct header = partitionData.getStruct(PARTITION_HEADER_KEY_NAME);
+ Records records = partitionData.getRecords(RECORD_SET_KEY_NAME);
+
+ // include the partition header and the size of the record set
+ ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + 4);
+ header.writeTo(buffer);
+ buffer.putInt(records.sizeInBytes());
+ buffer.rewind();
+ sends.add(new ByteBufferSend(dest, buffer));
+
+ // finally the send for the record set itself
+ sends.add(new RecordsSend(dest, records));
+ }
+
+ private static void writeStruct(Struct struct,
+ int version,
+ LinkedHashMap<TopicPartition, PartitionData> responseData,
+ int throttleTime) {
+ List<FetchRequest.TopicAndPartitionData<PartitionData>> topicsData = FetchRequest.TopicAndPartitionData.batchByTopic(responseData);
+ List<Struct> topicArray = new ArrayList<>();
+ for (FetchRequest.TopicAndPartitionData<PartitionData> topicEntry: topicsData) {
+ Struct topicData = struct.instance(RESPONSES_KEY_NAME);
+ topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
+ List<Struct> partitionArray = new ArrayList<>();
+ for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.partitions.entrySet()) {
+ PartitionData fetchPartitionData = partitionEntry.getValue();
+ Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+ Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME);
+ partitionDataHeader.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+ partitionDataHeader.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
+ partitionDataHeader.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
+ partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
+ partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
+ partitionArray.add(partitionData);
+ }
+ topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+ topicArray.add(topicData);
+ }
+ struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
+
+ if (version >= 1)
+ struct.set(THROTTLE_TIME_KEY_NAME, throttleTime);
+ }
+
+ public static int sizeOf(int version, LinkedHashMap<TopicPartition, PartitionData> responseData) {
+ Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.FETCH.id, version));
+ writeStruct(struct, version, responseData, 0);
+ return 4 + struct.sizeOf();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
index 0b98e55..7fee476 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorRequest.java
@@ -41,7 +41,7 @@ public class GroupCoordinatorRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
return new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code(), Node.noNode());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
index 8e7beb4..1f447f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/GroupCoordinatorResponse.java
@@ -20,7 +20,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-public class GroupCoordinatorResponse extends AbstractRequestResponse {
+public class GroupCoordinatorResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.GROUP_COORDINATOR.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
index 02eaa99..3e7401c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java
@@ -49,7 +49,7 @@ public class HeartbeatRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
return new HeartbeatResponse(Errors.forException(e).code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index 7fe227c..72f0175 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -19,7 +19,7 @@ import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-public class HeartbeatResponse extends AbstractRequestResponse {
+public class HeartbeatResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
index 2845ee0..51855b6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
@@ -142,7 +142,7 @@ public class JoinGroupRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
case 1:
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index 8895ace..e7fc5b1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -23,7 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class JoinGroupResponse extends AbstractRequestResponse {
+public class JoinGroupResponse extends AbstractResponse {
private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id);
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index 52b9674..79dcd4a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -145,7 +145,7 @@ public class LeaderAndIsrRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, Short> responses = new HashMap<>(partitionStates.size());
for (TopicPartition partition : partitionStates.keySet()) {
responses.put(partition, Errors.forException(e).code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
index df57714..a754def 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class LeaderAndIsrResponse extends AbstractRequestResponse {
+public class LeaderAndIsrResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEADER_AND_ISR.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
index 3047193..6a3f8a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupRequest.java
@@ -43,7 +43,7 @@ public class LeaveGroupRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
return new LeaveGroupResponse(Errors.forException(e).code());
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index 6481ca7..9c7998b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -18,7 +18,7 @@ import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
-public class LeaveGroupResponse extends AbstractRequestResponse {
+public class LeaveGroupResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEAVE_GROUP.id);
private static final String ERROR_CODE_KEY_NAME = "error_code";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
index 3160702..3fd3b81 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsRequest.java
@@ -34,7 +34,7 @@ public class ListGroupsRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
switch (versionId) {
case 0:
short errorCode = Errors.forException(e).code();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 5519670..98573f8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class ListGroupsResponse extends AbstractRequestResponse {
+public class ListGroupsResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_GROUPS.id);
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 1aed523..c1db82d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -166,7 +166,7 @@ public class ListOffsetRequest extends AbstractRequest {
@Override
@SuppressWarnings("deprecation")
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
if (versionId == 0) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index dbeef05..0b257bc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -29,7 +29,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class ListOffsetResponse extends AbstractRequestResponse {
+public class ListOffsetResponse extends AbstractResponse {
public static final long UNKNOWN_TIMESTAMP = -1L;
public static final long UNKNOWN_OFFSET = -1L;
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 24a9bfc..f7d8f8b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -64,7 +64,7 @@ public class MetadataRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
List<MetadataResponse.TopicMetadata> topicMetadatas = new ArrayList<>();
Errors error = Errors.forException(e);
List<MetadataResponse.PartitionMetadata> partitions = Collections.emptyList();
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 444941b..a8baee5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -29,7 +29,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public class MetadataResponse extends AbstractRequestResponse {
+public class MetadataResponse extends AbstractResponse {
private static final short CURRENT_VERSION = ProtoUtils.latestVersion(ApiKeys.METADATA.id);
private static final String BROKERS_KEY_NAME = "brokers";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index df18486..1dfad1e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -154,7 +154,7 @@ public class OffsetCommitRequest extends AbstractRequest {
for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
Struct topicData = struct.instance(TOPICS_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
+ List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
PartitionData fetchPartitionData = partitionEntry.getValue();
Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
@@ -194,7 +194,7 @@ public class OffsetCommitRequest extends AbstractRequest {
else
retentionTime = DEFAULT_RETENTION_TIME;
- offsetData = new HashMap<TopicPartition, PartitionData>();
+ offsetData = new HashMap<>();
for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
Struct topicData = (Struct) topicDataObj;
String topic = topicData.getString(TOPIC_KEY_NAME);
@@ -217,8 +217,8 @@ public class OffsetCommitRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, Short> responseData = new HashMap<>();
for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
responseData.put(entry.getKey(), Errors.forException(e).code());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 1dfda93..abb260e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class OffsetCommitResponse extends AbstractRequestResponse {
+public class OffsetCommitResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
private static final String RESPONSES_KEY_NAME = "responses";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 422328e..ede0f27 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -51,11 +51,11 @@ public class OffsetFetchRequest extends AbstractRequest {
Map<String, List<Integer>> topicsData = CollectionUtils.groupDataByTopic(partitions);
struct.set(GROUP_ID_KEY_NAME, groupId);
- List<Struct> topicArray = new ArrayList<Struct>();
+ List<Struct> topicArray = new ArrayList<>();
for (Map.Entry<String, List<Integer>> entries: topicsData.entrySet()) {
Struct topicData = struct.instance(TOPICS_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, entries.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
+ List<Struct> partitionArray = new ArrayList<>();
for (Integer partiitonId : entries.getValue()) {
Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
partitionData.set(PARTITION_KEY_NAME, partiitonId);
@@ -71,7 +71,7 @@ public class OffsetFetchRequest extends AbstractRequest {
public OffsetFetchRequest(Struct struct) {
super(struct);
- partitions = new ArrayList<TopicPartition>();
+ partitions = new ArrayList<>();
for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
Struct topicResponse = (Struct) topicResponseObj;
String topic = topicResponse.getString(TOPIC_KEY_NAME);
@@ -85,8 +85,8 @@ public class OffsetFetchRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
- Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<TopicPartition, OffsetFetchResponse.PartitionData>();
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
+ Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
for (TopicPartition partition: partitions) {
responseData.put(partition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 1715777..ae4e066 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.utils.CollectionUtils;
-public class OffsetFetchResponse extends AbstractRequestResponse {
+public class OffsetFetchResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_FETCH.id);
private static final String RESPONSES_KEY_NAME = "responses";
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index c7d41e6..25209ce 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -19,6 +19,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ProtoUtils;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.utils.CollectionUtils;
@@ -45,23 +46,23 @@ public class ProduceRequest extends AbstractRequest {
private final short acks;
private final int timeout;
- private final Map<TopicPartition, ByteBuffer> partitionRecords;
+ private final Map<TopicPartition, MemoryRecords> partitionRecords;
- public ProduceRequest(short acks, int timeout, Map<TopicPartition, ByteBuffer> partitionRecords) {
+ public ProduceRequest(short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords) {
super(new Struct(CURRENT_SCHEMA));
- Map<String, Map<Integer, ByteBuffer>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
+ Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords);
struct.set(ACKS_KEY_NAME, acks);
struct.set(TIMEOUT_KEY_NAME, timeout);
- List<Struct> topicDatas = new ArrayList<Struct>(recordsByTopic.size());
- for (Map.Entry<String, Map<Integer, ByteBuffer>> entry : recordsByTopic.entrySet()) {
+ List<Struct> topicDatas = new ArrayList<>(recordsByTopic.size());
+ for (Map.Entry<String, Map<Integer, MemoryRecords>> entry : recordsByTopic.entrySet()) {
Struct topicData = struct.instance(TOPIC_DATA_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, entry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
- for (Map.Entry<Integer, ByteBuffer> partitionEntry : entry.getValue().entrySet()) {
- ByteBuffer buffer = partitionEntry.getValue().duplicate();
+ List<Struct> partitionArray = new ArrayList<>();
+ for (Map.Entry<Integer, MemoryRecords> partitionEntry : entry.getValue().entrySet()) {
+ MemoryRecords records = partitionEntry.getValue();
Struct part = topicData.instance(PARTITION_DATA_KEY_NAME)
.set(PARTITION_KEY_NAME, partitionEntry.getKey())
- .set(RECORD_SET_KEY_NAME, buffer);
+ .set(RECORD_SET_KEY_NAME, records);
partitionArray.add(part);
}
topicData.set(PARTITION_DATA_KEY_NAME, partitionArray.toArray());
@@ -75,14 +76,14 @@ public class ProduceRequest extends AbstractRequest {
public ProduceRequest(Struct struct) {
super(struct);
- partitionRecords = new HashMap<TopicPartition, ByteBuffer>();
+ partitionRecords = new HashMap<>();
for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
Struct topicData = (Struct) topicDataObj;
String topic = topicData.getString(TOPIC_KEY_NAME);
for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) {
Struct partitionResponse = (Struct) partitionResponseObj;
int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
- ByteBuffer records = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
+ MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME);
partitionRecords.put(new TopicPartition(topic, partition), records);
}
}
@@ -91,14 +92,14 @@ public class ProduceRequest extends AbstractRequest {
}
@Override
- public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
+ public AbstractResponse getErrorResponse(int versionId, Throwable e) {
/* In case the producer doesn't actually want any response */
if (acks == 0)
return null;
- Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<TopicPartition, ProduceResponse.PartitionResponse>();
+ Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>();
- for (Map.Entry<TopicPartition, ByteBuffer> entry : partitionRecords.entrySet()) {
+ for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet()) {
responseMap.put(entry.getKey(), new ProduceResponse.PartitionResponse(Errors.forException(e).code(), ProduceResponse.INVALID_OFFSET, Record.NO_TIMESTAMP));
}
@@ -122,7 +123,7 @@ public class ProduceRequest extends AbstractRequest {
return timeout;
}
- public Map<TopicPartition, ByteBuffer> partitionRecords() {
+ public Map<TopicPartition, MemoryRecords> partitionRecords() {
return partitionRecords;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 58175e1..71e6ab5 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -28,7 +28,7 @@ import java.util.Map;
/**
* This wrapper supports both v0 and v1 of ProduceResponse.
*/
-public class ProduceResponse extends AbstractRequestResponse {
+public class ProduceResponse extends AbstractResponse {
private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id);
private static final String RESPONSES_KEY_NAME = "responses";
@@ -98,7 +98,7 @@ public class ProduceResponse extends AbstractRequestResponse {
*/
public ProduceResponse(Struct struct) {
super(struct);
- responses = new HashMap<TopicPartition, PartitionResponse>();
+ responses = new HashMap<>();
for (Object topicResponse : struct.getArray(RESPONSES_KEY_NAME)) {
Struct topicRespStruct = (Struct) topicResponse;
String topic = topicRespStruct.getString(TOPIC_KEY_NAME);
@@ -117,11 +117,11 @@ public class ProduceResponse extends AbstractRequestResponse {
private void initCommonFields(Map<TopicPartition, PartitionResponse> responses) {
Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses);
- List<Struct> topicDatas = new ArrayList<Struct>(responseByTopic.size());
+ List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
for (Map.Entry<String, Map<Integer, PartitionResponse>> entry : responseByTopic.entrySet()) {
Struct topicData = struct.instance(RESPONSES_KEY_NAME);
topicData.set(TOPIC_KEY_NAME, entry.getKey());
- List<Struct> partitionArray = new ArrayList<Struct>();
+ List<Struct> partitionArray = new ArrayList<>();
for (Map.Entry<Integer, PartitionResponse> partitionEntry : entry.getValue().entrySet()) {
PartitionResponse part = partitionEntry.getValue();
Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java b/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
new file mode 100644
index 0000000..1c3bb0d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RecordsSend.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.network.TransportLayer;
+import org.apache.kafka.common.record.Records;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+
+public class RecordsSend implements Send {
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+
+ private final String destination;
+ private final Records records;
+ private int remaining;
+ private boolean pending = false;
+
+ public RecordsSend(String destination, Records records) {
+ this.destination = destination;
+ this.records = records;
+ this.remaining = records.sizeInBytes();
+ }
+
+ @Override
+ public String destination() {
+ return destination;
+ }
+
+ @Override
+ public boolean completed() {
+ return remaining <= 0 && !pending;
+ }
+
+ @Override
+ public long writeTo(GatheringByteChannel channel) throws IOException {
+ long written = 0;
+ if (remaining > 0) {
+ written = records.writeTo(channel, size() - remaining, remaining);
+ if (written < 0)
+ throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
+ }
+
+ if (channel instanceof TransportLayer) {
+ TransportLayer tl = (TransportLayer) channel;
+ pending = tl.hasPendingWrites();
+
+ if (remaining <= 0 && pending)
+ channel.write(EMPTY_BYTE_BUFFER);
+ }
+
+ remaining -= written;
+ return written;
+ }
+
+ @Override
+ public long size() {
+ return records.sizeInBytes();
+ }
+}