You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/10 17:50:59 UTC
[GitHub] [kafka] mumrah opened a new pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah opened a new pull request #9008:
URL: https://github.com/apache/kafka/pull/9008
This change makes use of the generated protocols for FetchRequest and FetchResponse. The main challenge here was how to allow the transferrable bytes of the record set to be directly sent to the outgoing response without copying into a buffer.
The proposed solution is similar to the existing multi-send object used in [FetchResponse](TODO). However, a new writer class [RecordsWriter](TODO) was introduced to allow interleaving of ByteBufferSend (for headers and other non-record fields) along with RecordsSend-s which implement the efficient byte transfer.
Another change introduced here is that FetchRequest and FetchResponse do not maintain their own copies of the fields from the message. Instead, they hold a reference to the generated message class (FetchRequestData and FetchResponseData). Read-only copies of different forms of the message data are created once open construction to allow for efficient access using the existing class methods.
For example, in FetchRequest we hold the FetchRequestData, but also compute and hold:
```java
private final FetchRequestData fetchRequestData;
// These are immutable read-only structures derived from FetchRequestData
private final Map<TopicPartition, PartitionData> fetchData;
private final List<TopicPartition> toForget;
private final FetchMetadata metadata;
```
And in FetchResponse, we similarly hold:
```java
private final FetchResponseData fetchResponseData;
private final LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap;
```
If we want, we could deprecate all the accessors on FetchRequest/FetchResponse and force callers to use the `#data()` method. This would eliminate the need for these additional data structures.
Finally, most of the other changes are fixing up tests that were actually using invalid default values for protocol messages (which are now enforced, thanks to the generated classes) as well as rectifying the JSON schema to match what the actual defined `Schema`s were (e.g., FETCH_RESPONSE_V11)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456224028
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
field.camelCaseName(), field.camelCaseName());
}
+ } else if (field.type().isRecords()) {
+ // TODO is this valid for record instances?
Review comment:
@mumrah : equality for the generated messages should mean bytewise equality. So if two FetchResponseData instances contain the same data, they should be equal, even if one is using MemoryRecords and the other is using FileRecords. Same for hashCode, of course.
If it's too much trouble to change the Records class, you can just write a static utility method in MessageUtils and invoke it from the generated classes. I expect that we won't be doing this kind of comparison except in tests, so you don't need to optimize the method too much.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r454664545
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
Review comment:
That's fair. I'm ok to leave it as is.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456692033
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Would `org.apache.kafka.common.utils.ByteBufferOutputStream` be useful here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r458382935
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
I increased the initial buffer size to 64 and also added 2x growth factor for the buffer. It occurred to me the initial size only really helps for the first partition's header fields, but beyond that (since we are reusing/growing the same ByteBufferOutputStream) we don't know what we'll need. The JMH benchmark did confirm that 2x was more performant than 1.1x for FetchResponse.
Existing usages of ByteBufferOutputStream were not modified and still use 1.1x
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457416269
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteBufferOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+ private int mark;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ this.mark = 0;
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ try {
+ output.writeByte(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeShort(short val) {
+ try {
+ output.writeShort(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeInt(int val) {
+ try {
+ output.writeInt(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeLong(long val) {
+ try {
+ output.writeLong(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ try {
+ ByteUtils.writeDouble(val, output);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ try {
+ output.write(arr);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ try {
+ ByteUtils.writeUnsignedVarint(i, output);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ try {
+ output.write(src.array(), src.position(), src.remaining());
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBuffer buf = byteArrayOutputStream.buffer();
+ int end = buf.position();
+ int len = end - mark;
+
+ if (len > 0) {
+ buf.position(mark);
+ ByteBuffer slice = buf.slice();
+ slice.limit(len);
+ ByteBufferSend send = new ByteBufferSend(dest, slice);
+ sendConsumer.accept(send);
+ }
+
+ buf.position(end);
+ mark = end;
Review comment:
I ended up having to keep a separate mark here since ByteBufferOutputStream doesn't keep the mark when it replaces the underlying buffer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
}
}
+ private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+ Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+ fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+ Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+ .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+ new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+ fetchPartition.partitionMaxBytes(), leaderEpoch));
Review comment:
Let's open a jira for getting rid of the toPartitionDataMap if we don't address it in this PR. It's a pretty large part of the cost here and there are only a few places we would have to deal with it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456539488
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
Review comment:
This would only get called when we're writing a int8 field I believe, but you're right these lambdas could create un-needed object allocations. I'll rewrite as simple try/catch statements
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453849419
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
field.camelCaseName(), field.camelCaseName());
}
+ } else if (field.type().isRecords()) {
+ // TODO is this valid for record instances?
Review comment:
No I don't think they are designed to be compared. My main question was whether we can compare the same type (MemoryRecords to MemoryRecords). I think it should work in the case of `Objects.equals` since it first checks if the instances are the same. I don't think we have any use cases where we have equivalent instances of records that are actual separate objects.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-658955696
> Added some basic jmh benchmarks. Here are the preliminary results (run on my laptop, so take with a grain of salt). All these tests are using 1000 topics with 20 partitions each. For FetchResponse, I used static MemoryRecords rather than FileRecords to try and better isolate the serialization time.
>
> On `trunk`:
>
> ```
> Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
> FetchRequestBenchmark.testConstructFetchRequest 20 1000 avgt 30 3.591 ± 0.046 ns/op
> FetchRequestBenchmark.testSerializeFetchRequest 20 1000 avgt 30 10049872.274 ± 440324.738 ns/op
> FetchResponseBenchmark.testConstructFetchResponse 20 1000 avgt 30 1.911 ± 0.018 ns/op
> FetchResponseBenchmark.testSerializeFetchResponse 20 1000 avgt 30 13693835.230 ± 150935.356 ns/op
> ```
>
> On this branch:
>
> ```
> Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
> FetchRequestBenchmark.testConstructFetchRequest 20 1000 avgt 30 4809813.661 ± 190773.702 ns/op
> FetchRequestBenchmark.testSerializeFetchRequest 20 1000 avgt 30 4646758.697 ± 551449.969 ns/op
> FetchResponseBenchmark.testConstructFetchResponse 20 1000 avgt 30 2507813.886 ± 17457.127 ns/op
> FetchResponseBenchmark.testSerializeFetchResponse 20 1000 avgt 30 7231935.691 ± 461221.717 ns/op
> ```
>
> As we expected quite a bit more time is spent during the construction of FetchRequest/FetchResponse due to conversion to existing data structures. We also see a reducing in serialization time since we no longer convert to `Struct` first.
>
> FetchRequest total construction+serialization time is about the same before and after the change, and FetchResponse total time is slightly less after the change.
Nice improvement! Could you please rerun them both with `./jmh.sh -prof gc`? We should make sure that we are not increasing our garbage generation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576
Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included.
On this branch:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer 3 10 avgt 15 2110.741 ± 27.935 ns/op
FetchRequestBenchmark.testFetchRequestForReplica 3 10 avgt 15 2021.114 ± 7.816 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer 3 10 avgt 15 3452.799 ± 16.013 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica 3 10 avgt 15 3691.157 ± 60.260 ns/op
GC Profile (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 4295.532 ± 56.061 MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 9984.000 ± 0.001 B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 4292.525 ± 56.341 MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 9977.037 ± 28.311 B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.187 ± 0.027 MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.435 ± 0.060 B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 3 10 avgt 15 2335.000 counts
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time 3 10 avgt 15 1375.000 ms
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 4416.855 ± 16.429 MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 9832.000 ± 0.001 B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 4417.032 ± 24.858 MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 9832.358 ± 28.932 B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.186 ± 0.015 MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.415 ± 0.033 B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 3 10 avgt 15 2280.000 counts
FetchRequestBenchmark.testFetchRequestForReplica:·gc.time 3 10 avgt 15 1376.000 ms
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 3256.172 ± 15.524 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 12384.000 ± 0.001 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 3255.019 ± 21.484 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 12379.587 ± 49.161 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.122 ± 0.022 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.462 ± 0.084 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count 3 10 avgt 15 2054.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time 3 10 avgt 15 1389.000 ms
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 3319.965 ± 53.427 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 13496.000 ± 0.001 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 3320.125 ± 52.812 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 13496.813 ± 64.774 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.126 ± 0.021 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.512 ± 0.085 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count 3 10 avgt 15 2122.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time
```
On trunk:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer 3 10 avgt 15 3.457 ± 0.016 ns/op
FetchRequestBenchmark.testFetchRequestForReplica 3 10 avgt 15 3.453 ± 0.035 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer 3 10 avgt 15 13214.306 ± 61.158 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica 3 10 avgt 15 13147.870 ± 52.318 ns/op
GC Profile (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 ≈ 10⁻⁴ MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 ≈ 10⁻⁶ B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 3 10 avgt 15 ≈ 0 counts
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 ≈ 10⁻⁴ MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 ≈ 10⁻⁶ B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 3 10 avgt 15 ≈ 0 counts
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 1795.576 ± 8.351 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 26136.002 ± 0.005 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 1796.108 ± 11.527 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 26143.702 ± 100.832 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.163 ± 0.019 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 2.366 ± 0.270 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count 3 10 avgt 15 2134.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time 3 10 avgt 15 1412.000 ms
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 1804.695 ± 7.193 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 26136.002 ± 0.005 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 1805.666 ± 7.990 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 26150.127 ± 86.455 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.166 ± 0.016 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 2.406 ± 0.238 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count 3 10 avgt 15 2097.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456662850
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2397,6 +2460,9 @@ private String fieldDefault(FieldSpec field) {
headerGenerator.addImport(MessageGenerator.BYTES_CLASS);
return "Bytes.EMPTY";
}
+ } else if (field.type().isRecords()) {
+ // TODO should we use some special EmptyRecords class instead?
Review comment:
Using null as the default seems reasonable to me. We also have `MemoryRecords.EMPTY` which we could use.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456692033
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Would `ByteBufferOutputStream` be useful here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457496688
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
@mumrah Thanks for checking this. However, the behavior in JDK 14 has not changed in that way. Performance would be atrocious if it did:
```java
private void ensureCapacity(int minCapacity) {
// overflow-conscious code
int oldCapacity = buf.length;
int minGrowth = minCapacity - oldCapacity;
if (minGrowth > 0) {
buf = Arrays.copyOf(buf, ArraysSupport.newLength(oldCapacity,
minGrowth, oldCapacity /* preferred growth */));
}
```
The third parameter passed to `newLength` is the preferred growth, which is `oldCapacity`. That is, it doubles if it doesn't cause overflow. We should probably double for `ByteBufferOutputStream` too _if_ we have no estimate of the expected size. `1.1` growth makes sense if we do have a reasonable estimate (which is the case in current usage, I believe).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r454866708
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
}
}
+ private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+ Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+ fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+ Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+ .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+ new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+ fetchPartition.partitionMaxBytes(), leaderEpoch));
Review comment:
@mumrah Have we considered dropping the `PartitionData` class entirely in favour of using `FetchRequestData .FetchPartition` directly in the broker? The main difference is that `FetchPartition` does not have an `Optional` for the leader epoch but returns the default value (-1) instead.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453846799
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
Review comment:
Are you suggesting a combined records reader+writer? ByteBufferAccessor is both Readable and Writable
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455518434
##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+ @Param({"1000"})
+ private int topicCount;
+
+ @Param({"20"})
+ private int partitionCount;
+
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+ RequestHeader header;
+
+ FetchRequest request;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ this.fetchData = new HashMap<>();
+ for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+ 0, 0, 4096, Optional.empty());
+ fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+ }
+ }
+
+ this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+ this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());
Review comment:
Changing our hashCode method massively improves the benchmark times so I think the current benchmark results aren't really representative.
```
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -46,10 +46,7 @@ public final class TopicPartition implements Serializable {
public int hashCode() {
if (hash != 0)
return hash;
- final int prime = 31;
- int result = 1;
- result = prime * result + partition;
- result = prime * result + Objects.hashCode(topic);
+ int result = Objects.hash(topic, partition);
this.hash = result;
return result;
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456225427
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
field.camelCaseName(), field.camelCaseName());
}
+ } else if (field.type().isRecords()) {
+ // TODO is this valid for record instances?
Review comment:
That would mean loading data from disk to compute equals and hashCode for FileRecords. That's pretty unusual for such methods.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456197601
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
This creates a copy of the underlying bytes, can we avoid it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455518434
##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+ @Param({"1000"})
+ private int topicCount;
+
+ @Param({"20"})
+ private int partitionCount;
+
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+ RequestHeader header;
+
+ FetchRequest request;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ this.fetchData = new HashMap<>();
+ for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+ 0, 0, 4096, Optional.empty());
+ fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+ }
+ }
+
+ this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+ this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());
Review comment:
Changing our hashCode method massively improves the benchmark times so I think the current benchmark results aren't really representative.
```
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -46,10 +46,7 @@ public final class TopicPartition implements Serializable {
public int hashCode() {
if (hash != 0)
return hash;
- final int prime = 31;
- int result = 1;
- result = prime * result + partition;
- result = prime * result + Objects.hashCode(topic);
+ int result = Objects.hash(topic, partition);
this.hash = result;
return result;
}
```
Edit: it looks like the main difference here is ordering by topic and then partition which seems to avoid the collisions for this reasonably pathological case. Maybe we can just change the test case.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456092037
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.protocol.ApiMessage;
+
public interface AbstractRequestResponse {
+ /**
+ * Return the auto-generated `Message` instance if this request/response relies on one for
+ * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+ * classes, return `null`.
+ * @return
+ */
+ default ApiMessage data() {
Review comment:
Perhaps instead we could add this to a mixin type. Then if we find cases where getting accessing to the `ApiMessage` generally would be useful, we could just use `instanceof` checks. These would ultimately go away after the conversions are finished.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-656988853
I agree that it’d be great to have a benchmark on both the request and response side.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456727715
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Looks like the expansion factor for ByteArrayOutputStream varies on the JDK version. In [JDK 8](https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/share/classes/java/io/ByteArrayOutputStream.java#L105) and [11](https://github.com/openjdk/jdk/blob/jdk-11+28/src/java.base/share/classes/java/io/ByteArrayOutputStream.java) it's 2x, but in [JDK 14](https://github.com/openjdk/jdk/blob/jdk-14+36/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L95) it just grows the buffer to the minimum needed size.
Our growth factor of 1.1 in ByteBufferOutputStream seems reasonable . Not to mention avoiding the final copy by using slice would be nice too.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665938546
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455342535
##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+ @Param({"1000"})
+ private int topicCount;
+
+ @Param({"20"})
+ private int partitionCount;
+
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+ RequestHeader header;
+
+ FetchRequest request;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ this.fetchData = new HashMap<>();
+ for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+ 0, 0, 4096, Optional.empty());
+ fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+ }
+ }
+
+ this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+ this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());
Review comment:
Can you also try rerunning the benchmark with random topic names, e.g. `UUID.randomUUID().toString()` and compare it to the existing topic names? I think our hashCode implementation sucks and we are seeing a lot of collisions.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455318129
##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+ @Param({"1000"})
+ private int topicCount;
+
+ @Param({"20"})
+ private int partitionCount;
+
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+ RequestHeader header;
+
+ FetchRequest request;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ this.fetchData = new HashMap<>();
+ for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+ 0, 0, 4096, Optional.empty());
+ fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+ }
+ }
+
+ this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+ this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());
Review comment:
Can measure both forConsumer and forReplica fetch requests?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
}
}
+ private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+ Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+ fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+ Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+ .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+ new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+ fetchPartition.partitionMaxBytes(), leaderEpoch));
Review comment:
Let's open a jira for getting rid of the toPartitionDataMap if we don't address it in this PR. It's a pretty large part of the cost here and there are only a few places we would have to deal with it. I think we should fix it sooner rather than later too.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-658918847
Added some basic jmh benchmarks. Here are the preliminary results (run on my laptop, so take with a grain of salt). All these tests are using 1000 topics with 20 partitions each. For FetchResponse, I used static MemoryRecords rather than FileRecords to try and isolate the serialization time.
On `trunk`:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testConstructFetchRequest 20 1000 avgt 30 3.591 ± 0.046 ns/op
FetchRequestBenchmark.testSerializeFetchRequest 20 1000 avgt 30 10049872.274 ± 440324.738 ns/op
FetchResponseBenchmark.testConstructFetchResponse 20 1000 avgt 30 1.911 ± 0.018 ns/op
FetchResponseBenchmark.testSerializeFetchResponse 20 1000 avgt 30 13693835.230 ± 150935.356 ns/op
```
On this branch:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testConstructFetchRequest 20 1000 avgt 30 4809813.661 ± 190773.702 ns/op
FetchRequestBenchmark.testSerializeFetchRequest 20 1000 avgt 30 4646758.697 ± 551449.969 ns/op
FetchResponseBenchmark.testConstructFetchResponse 20 1000 avgt 30 2507813.886 ± 17457.127 ns/op
FetchResponseBenchmark.testSerializeFetchResponse 20 1000 avgt 30 7231935.691 ± 461221.717 ns/op
```
As we expected quite a bit more time is spent during the construction of FetchRequest/FetchResponse due to conversion to existing data structures. We also see a reducing in serialization time since we no longer convert to `Struct` first.
FetchRequest total construction+serialization time is about the same before and after the change, and FetchResponse total time is slightly less after the change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455518434
##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+ @Param({"1000"})
+ private int topicCount;
+
+ @Param({"20"})
+ private int partitionCount;
+
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+ RequestHeader header;
+
+ FetchRequest request;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ this.fetchData = new HashMap<>();
+ for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+ 0, 0, 4096, Optional.empty());
+ fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+ }
+ }
+
+ this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+ this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());
Review comment:
Changing our hashCode method massively improves our results so I think the current benchmark results aren't really representative.
```
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -46,10 +46,7 @@ public final class TopicPartition implements Serializable {
public int hashCode() {
if (hash != 0)
return hash;
- final int prime = 31;
- int result = 1;
- result = prime * result + partition;
- result = prime * result + Objects.hashCode(topic);
+ int result = Objects.hash(topic, partition);
this.hash = result;
return result;
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455361855
##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+ @Param({"1000"})
Review comment:
Let's add a smaller topic or partition count param benchmark. 20,000 partitions in a fetch request is larger than we would normally see :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453095731
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
int throttleTimeMs,
int sessionId) {
- this.error = error;
- this.responseData = responseData;
- this.throttleTimeMs = throttleTimeMs;
- this.sessionId = sessionId;
+ this.fetchResponseData = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
Review comment:
Probably better to save for a follow-up, but potentially we can get rid of this conversion by using `FetchablePartitionResponse` directly in the broker.
##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -49,41 +49,41 @@
"fields": [
{ "name": "ReplicaId", "type": "int32", "versions": "0+",
"about": "The broker ID of the follower, of -1 if this request is from a consumer." },
- { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
+ { "name": "MaxWaitTime", "type": "int32", "versions": "0+",
Review comment:
Can we revert some of these renamings? We intentionally changed them in #8802.
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -209,30 +210,21 @@
FORGOTTEN_TOPIC_DATA_V7,
RACK_ID);
Review comment:
Similarly, we can get rid of all this.
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
int throttleTimeMs,
int sessionId) {
- this.error = error;
- this.responseData = responseData;
- this.throttleTimeMs = throttleTimeMs;
- this.sessionId = sessionId;
+ this.fetchResponseData = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ this.responseDataMap = responseData;
}
- public static FetchResponse<MemoryRecords> parse(Struct struct) {
- LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.get(TOPIC_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
- int partition = partitionResponseHeader.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
- long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
- long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
- long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- Optional<Integer> preferredReadReplica = Optional.of(
- partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
- ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
- BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
- if (!(baseRecords instanceof MemoryRecords))
- throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
- MemoryRecords records = (MemoryRecords) baseRecords;
-
- List<AbortedTransaction> abortedTransactions = null;
- if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
- Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
- if (abortedTransactionsArray != null) {
- abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
- for (Object abortedTransactionObj : abortedTransactionsArray) {
- Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long producerId = abortedTransactionStruct.get(PRODUCER_ID);
- long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
- abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
- }
- }
- }
-
- PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica, abortedTransactions, records);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
- struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+ public FetchResponse(FetchResponseData fetchResponseData) {
+ this.fetchResponseData = fetchResponseData;
+ this.responseDataMap = toResponseDataMap(fetchResponseData);
}
@Override
public Struct toStruct(short version) {
- return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ return fetchResponseData.toStruct(version);
}
@Override
protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
- Struct responseHeaderStruct = responseHeader.toStruct();
- Struct responseBodyStruct = toStruct(apiVersion);
+ // Generate the Sends for the response fields and records
+ ArrayDeque<Send> sends = new ArrayDeque<>();
+ RecordsWriter writer = new RecordsWriter(dest, sends::add);
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ fetchResponseData.size(cache, apiVersion);
+ fetchResponseData.write(writer, cache, apiVersion);
+ writer.flush();
+
+ // Compute the total size of all the Sends and write it out along with the header in the first Send
+ ResponseHeaderData responseHeaderData = responseHeader.data();
+
+ //Struct responseHeaderStruct = responseHeader.toStruct();
Review comment:
nit: I guess we didn't need this?
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -218,23 +220,19 @@
SESSION_ID,
new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V6)));
-
- public static Schema[] schemaVersions() {
- return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
- FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
- FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, FETCH_RESPONSE_V10,
- FETCH_RESPONSE_V11};
- }
-
public static final long INVALID_HIGHWATERMARK = -1L;
public static final long INVALID_LAST_STABLE_OFFSET = -1L;
public static final long INVALID_LOG_START_OFFSET = -1L;
public static final int INVALID_PREFERRED_REPLICA_ID = -1;
- private final int throttleTimeMs;
- private final Errors error;
- private final int sessionId;
- private final LinkedHashMap<TopicPartition, PartitionData<T>> responseData;
+ private final FetchResponseData fetchResponseData;
Review comment:
nit: in all of the other classes, we just use the name `data`. Can we do the same here?
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -218,23 +220,19 @@
SESSION_ID,
new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V6)));
Review comment:
We can get rid of all the stuff above too, right?
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
field.camelCaseName(), field.camelCaseName());
}
+ } else if (field.type().isRecords()) {
+ // TODO is this valid for record instances?
Review comment:
I don't think `FileRecords` and `MemoryRecords` instances can be compared directly, if that's what the question is about.
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
Review comment:
Is it worth extending `ByteBufferAccessor` or not?
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
int throttleTimeMs,
int sessionId) {
- this.error = error;
- this.responseData = responseData;
- this.throttleTimeMs = throttleTimeMs;
- this.sessionId = sessionId;
+ this.fetchResponseData = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ this.responseDataMap = responseData;
}
- public static FetchResponse<MemoryRecords> parse(Struct struct) {
- LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.get(TOPIC_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
- int partition = partitionResponseHeader.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
- long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
- long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
- long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- Optional<Integer> preferredReadReplica = Optional.of(
- partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
- ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
- BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
- if (!(baseRecords instanceof MemoryRecords))
- throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
- MemoryRecords records = (MemoryRecords) baseRecords;
-
- List<AbortedTransaction> abortedTransactions = null;
- if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
- Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
- if (abortedTransactionsArray != null) {
- abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
- for (Object abortedTransactionObj : abortedTransactionsArray) {
- Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long producerId = abortedTransactionStruct.get(PRODUCER_ID);
- long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
- abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
- }
- }
- }
-
- PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica, abortedTransactions, records);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
- struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+ public FetchResponse(FetchResponseData fetchResponseData) {
+ this.fetchResponseData = fetchResponseData;
+ this.responseDataMap = toResponseDataMap(fetchResponseData);
}
@Override
public Struct toStruct(short version) {
- return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ return fetchResponseData.toStruct(version);
}
@Override
protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
- Struct responseHeaderStruct = responseHeader.toStruct();
- Struct responseBodyStruct = toStruct(apiVersion);
+ // Generate the Sends for the response fields and records
+ ArrayDeque<Send> sends = new ArrayDeque<>();
+ RecordsWriter writer = new RecordsWriter(dest, sends::add);
Review comment:
Pretty nice if this is all the manual code we need. If we wanted to go a little further, we could push `toSend` into the generated class as well. That will be necessary if we ever want to get of the current `AbstractRequest` and `AbstractResponse` types and replace them with the generated data classes (which was always the plan). However, I think this could be left for follow-up work.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456449035
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
}
public boolean isFromFollower() {
- return replicaId >= 0;
+ return replicaId() >= 0;
}
public IsolationLevel isolationLevel() {
- return isolationLevel;
+ return IsolationLevel.forId(data.isolationLevel());
}
public FetchMetadata metadata() {
return metadata;
}
public String rackId() {
- return rackId;
+ return data.rackId();
}
public static FetchRequest parse(ByteBuffer buffer, short version) {
- return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+ ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+ FetchRequestData message = new FetchRequestData();
+ message.read(accessor, version);
+ return new FetchRequest(message, version);
+ }
+
+ @Override
+ public ByteBuffer serialize(RequestHeader header) {
Review comment:
Indeed this is generic serialization code for the message classes. If we go with a mixin interface to indicate a class has been converted over to generated messages, we could also push this up to AbstractRequest. However, this might be better saved for a follow-on since we'll probably want to pick up additional changes from @ijuma's PR. Thoughts?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456730580
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2397,6 +2460,9 @@ private String fieldDefault(FieldSpec field) {
headerGenerator.addImport(MessageGenerator.BYTES_CLASS);
return "Bytes.EMPTY";
}
+ } else if (field.type().isRecords()) {
+ // TODO should we use some special EmptyRecords class instead?
Review comment:
Ok, going to leave it as `null` since we need to deal with FileRecords and MemoryRecords in different cases.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456195934
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
Review comment:
This seems pretty inefficient (creating a lambda for each `byte` we write). Are we sure about it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456660379
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
}
public boolean isFromFollower() {
- return replicaId >= 0;
+ return replicaId() >= 0;
}
public IsolationLevel isolationLevel() {
- return isolationLevel;
+ return IsolationLevel.forId(data.isolationLevel());
}
public FetchMetadata metadata() {
return metadata;
}
public String rackId() {
- return rackId;
+ return data.rackId();
}
public static FetchRequest parse(ByteBuffer buffer, short version) {
- return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+ ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+ FetchRequestData message = new FetchRequestData();
+ message.read(accessor, version);
+ return new FetchRequest(message, version);
+ }
+
+ @Override
+ public ByteBuffer serialize(RequestHeader header) {
Review comment:
I'm ok saving this for #7409.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-662745982
Thanks for this, @mumrah. I took a look at the overall approach with the `RecordsWriter` and it looks reasonable.
Do we need `RecordsReader`? Seems like we could just add the `readRecords` method to `ByteBufferAccessor`.
I do think `RecordsWriter` needs to be a separate class from `ByteBufferAccessor` -- it is doing something quite different, after all. But I'm not sure that the generated code needs to know about `RecordsWriter`. If we add a `writeRecords` method to `Writable` and a simple implementation to `ByteBufferAccessor`, we can avoid the downcast in the generated code. That also suggests that maybe `RecordsWriter` could be in `org.apache.kafka.common.record`? Maybe.
It seems like `Writable` should have a `Writable#close` method, in case there's something the writable needs to do when we're done adding things. Actually it should just extend AutoCloseable so that the compiler will complain if we don't close it appropriately. Then that can be a no-op for `ByteBufferAccessor` but call flush when using `RecordsWriter`.
Using `ByteBufferOutputStream` is wasteful when you have to do a lot of doublings. When you do a doubling, you end up copying a lot of data that has already been flushed (and hence sent to the Sender). You're making a new buffer to contain it, but why? Nobody will ever read that part of the new buffer. What the Sender will read is the part of the old (pre-doubling) buffer that contained that data.
What you really want is to get rid of the ByteBufferOutputStream and just manage the buffer yourself here. Then, when you need to enlarge, you can just copy the data that's live and not the old, already flushed data.
The above could be done in a follow-on if you want. I don't think it should block the merge
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453849419
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
field.camelCaseName(), field.camelCaseName());
}
+ } else if (field.type().isRecords()) {
+ // TODO is this valid for record instances?
Review comment:
No I don't think they are designed to be compared. My main question was whether we can compare the same type (MemoryRecords to MemoryRecords). I think it should work in the case of `Objects.equals` since it first checks if the instances are the same. I don't think we have any use cases where we have equivalent instances of records that are actual separate objects.
I have a similar question about hashCode down below. Records doesn't implement either of these, but we have to include them for all fields in the generated message classes for completeness. I think it's probably fine.
@cmccabe, any insight here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r459499911
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteBufferOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+ private int mark;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ this.mark = 0;
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ try {
+ output.writeByte(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeShort(short val) {
+ try {
+ output.writeShort(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeInt(int val) {
+ try {
+ output.writeInt(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeLong(long val) {
+ try {
+ output.writeLong(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ try {
+ ByteUtils.writeDouble(val, output);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ try {
+ output.write(arr);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ try {
+ ByteUtils.writeUnsignedVarint(i, output);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ try {
+ output.write(src.array(), src.position(), src.remaining());
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBuffer buf = byteArrayOutputStream.buffer();
+ int end = buf.position();
+ int len = end - mark;
+
+ if (len > 0) {
+ buf.position(mark);
+ ByteBuffer slice = buf.slice();
+ slice.limit(len);
+ ByteBufferSend send = new ByteBufferSend(dest, slice);
+ sendConsumer.accept(send);
+ }
+
+ buf.position(end);
+ mark = end;
Review comment:
Ended up not using ByteBufferOutputStream
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456444794
##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -47,33 +47,35 @@
"about": "The top level response error code." },
{ "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
"about": "The fetch session ID, or 0 if this is not part of a fetch session." },
- { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+ { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
Review comment:
I was trying to keep the field names aligned with what was defined in the manual schemas. So, not sure if we should change any other field names in this PR
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456091241
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.protocol.ApiMessage;
+
public interface AbstractRequestResponse {
+ /**
+ * Return the auto-generated `Message` instance if this request/response relies on one for
+ * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+ * classes, return `null`.
+ * @return
+ */
+ default ApiMessage data() {
Review comment:
I have a PR that does need. I really need to get that over the line.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-656809268
Thanks for the PR. Since this affects the fetch path, let's make sure we benchmark this. cc @lbradstreet
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456076927
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.protocol.ApiMessage;
+
public interface AbstractRequestResponse {
+ /**
+ * Return the auto-generated `Message` instance if this request/response relies on one for
+ * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+ * classes, return `null`.
+ * @return
+ */
+ default ApiMessage data() {
Review comment:
Is there an advantage to pulling this up? Seems like we still need to update a bunch more classes. Until we have all the protocols converted, it might be safer to find another approach.
##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1249,26 +1249,26 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
}
}
- if (partition.highWatermark >= 0) {
- log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
- subscriptions.updateHighWatermark(tp, partition.highWatermark);
+ if (partition.highWatermark() >= 0) {
+ log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
+ subscriptions.updateHighWatermark(tp, partition.highWatermark());
}
- if (partition.logStartOffset >= 0) {
- log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
- subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
+ if (partition.logStartOffset() >= 0) {
+ log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
+ subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
}
- if (partition.lastStableOffset >= 0) {
- log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
- subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
+ if (partition.lastStableOffset() >= 0) {
+ log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
+ subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
}
- if (partition.preferredReadReplica.isPresent()) {
- subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica.get(), () -> {
+ if (partition.preferredReadReplica().isPresent()) {
Review comment:
nit: could probably change this to use `ifPresent`
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
}
public boolean isFromFollower() {
- return replicaId >= 0;
+ return replicaId() >= 0;
}
public IsolationLevel isolationLevel() {
- return isolationLevel;
+ return IsolationLevel.forId(data.isolationLevel());
}
public FetchMetadata metadata() {
return metadata;
}
public String rackId() {
- return rackId;
+ return data.rackId();
}
public static FetchRequest parse(ByteBuffer buffer, short version) {
- return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+ ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+ FetchRequestData message = new FetchRequestData();
+ message.read(accessor, version);
+ return new FetchRequest(message, version);
+ }
+
+ @Override
+ public ByteBuffer serialize(RequestHeader header) {
Review comment:
Are we overriding this so that we save the conversion to `Struct`? As far as I can tell, there's nothing specific to `FetchRequest` below. I wonder if we can move this implementation to `AbstractRequest.serialize` so that we save the conversion to Struct for all APIs that have been converted?
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
}
public boolean isFromFollower() {
- return replicaId >= 0;
+ return replicaId() >= 0;
}
public IsolationLevel isolationLevel() {
- return isolationLevel;
+ return IsolationLevel.forId(data.isolationLevel());
}
public FetchMetadata metadata() {
return metadata;
}
public String rackId() {
- return rackId;
+ return data.rackId();
}
public static FetchRequest parse(ByteBuffer buffer, short version) {
- return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+ ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
Review comment:
In the parsing logic, we still convert to struct first before calling `AbstractRequest.parseRequest`. I think we could bypass the `Struct` conversion by changing `AbstractRequest.parseRequest` to take the `ByteBuffer` instead of the `Struct`.
```java
public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, ByteBuffer buffer) {
```
Then in the fetch case, we could just call this method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457416269
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteBufferOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+ private int mark;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ this.mark = 0;
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ try {
+ output.writeByte(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeShort(short val) {
+ try {
+ output.writeShort(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeInt(int val) {
+ try {
+ output.writeInt(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeLong(long val) {
+ try {
+ output.writeLong(val);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ try {
+ ByteUtils.writeDouble(val, output);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ try {
+ output.write(arr);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ try {
+ ByteUtils.writeUnsignedVarint(i, output);
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ try {
+ output.write(src.array(), src.position(), src.remaining());
+ } catch (IOException e) {
+ throw new RuntimeException("RecordsWriter encountered an IO error", e);
+ }
+ }
+
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBuffer buf = byteArrayOutputStream.buffer();
+ int end = buf.position();
+ int len = end - mark;
+
+ if (len > 0) {
+ buf.position(mark);
+ ByteBuffer slice = buf.slice();
+ slice.limit(len);
+ ByteBufferSend send = new ByteBufferSend(dest, slice);
+ sendConsumer.accept(send);
+ }
+
+ buf.position(end);
+ mark = end;
Review comment:
I ended up having to keep a separate mark here since ByteBufferOutputStream doesn't keep the mark when it replaces the underlying buffer. I also didn't want to mess with that class in this PR since it had quite a lot of usages. We could look into fixing that as a follow-on
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665845747
@ijuma you're right, i meant the consumer perf test. I updated my comment to clarify
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456447554
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
}
}
+ private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+ Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+ fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+ Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+ .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+ new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+ fetchPartition.partitionMaxBytes(), leaderEpoch));
Review comment:
Yeah, `Optional` support would be awesome. I was actually thinking how to do it. I may give it a shot during the weekend ;)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457496688
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
@mumrah Thanks for checking this. However, the behavior in JDK 14 has not changed in that way. Performance would be atrocious if it did:
```java
private void ensureCapacity(int minCapacity) {
// overflow-conscious code
int oldCapacity = buf.length;
int minGrowth = minCapacity - oldCapacity;
if (minGrowth > 0) {
buf = Arrays.copyOf(buf, ArraysSupport.newLength(oldCapacity,
minGrowth, oldCapacity /* preferred growth */));
}
```
The third parameter passed to `newLength` is the preferred growth, which is `oldCapacity`. That is, it doubles if it doesn't cause overflow. We should probably double for `ByteBufferOutputStream` too _if_ we have no estimate of the expected size. `1.1` makes sense if we do have an estimate (which is the case in current usage, I believe).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456450107
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
}
public boolean isFromFollower() {
- return replicaId >= 0;
+ return replicaId() >= 0;
}
public IsolationLevel isolationLevel() {
- return isolationLevel;
+ return IsolationLevel.forId(data.isolationLevel());
}
public FetchMetadata metadata() {
return metadata;
}
public String rackId() {
- return rackId;
+ return data.rackId();
}
public static FetchRequest parse(ByteBuffer buffer, short version) {
- return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+ ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
Review comment:
I believe this is also addressed in @ijuma's PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-662805571
Latest FetchResponse benchmark
trunk
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchResponseBenchmark.testConstructFetchResponse 3 10 avgt 15 2.126 ± 0.408 ns/op
FetchResponseBenchmark.testSerializeFetchResponse 3 10 avgt 15 19753.993 ± 3668.755 ns/op
JMH benchmarks done
```
this branch
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchResponseBenchmark.testConstructFetchResponse 3 10 avgt 15 1165.485 ± 62.632 ns/op
FetchResponseBenchmark.testSerializeFetchResponse 3 10 avgt 15 6468.729 ± 230.405 ns/op
JMH benchmarks done
```
So a pretty good reduction, overall.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947
I ran the consumer perf test (at @hachikuji's suggestion) and took a profile. Throughput was around 500MB/s on trunk and on this branch
![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png)
Zoomed in a bit on the records part:
![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png)
This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665828744
What were the throughput numbers? I assume you meant the connsumer perf test, not console consumer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-666374043
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457513713
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Looking a bit more, it seems like this will be mostly used by the data that precedes the actual records. Do we have a sense for what's the typical size for that? If we do, we can use that in the initial size and we can keep the `1.1` growth.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-658918847
Added some basic jmh benchmarks. Here are the preliminary results (run on my laptop, so take with a grain of salt). All these tests are using 1000 topics with 20 partitions each. For FetchResponse, I used static MemoryRecords rather than FileRecords to try and better isolate the serialization time.
On `trunk`:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testConstructFetchRequest 20 1000 avgt 30 3.591 ± 0.046 ns/op
FetchRequestBenchmark.testSerializeFetchRequest 20 1000 avgt 30 10049872.274 ± 440324.738 ns/op
FetchResponseBenchmark.testConstructFetchResponse 20 1000 avgt 30 1.911 ± 0.018 ns/op
FetchResponseBenchmark.testSerializeFetchResponse 20 1000 avgt 30 13693835.230 ± 150935.356 ns/op
```
On this branch:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testConstructFetchRequest 20 1000 avgt 30 4809813.661 ± 190773.702 ns/op
FetchRequestBenchmark.testSerializeFetchRequest 20 1000 avgt 30 4646758.697 ± 551449.969 ns/op
FetchResponseBenchmark.testConstructFetchResponse 20 1000 avgt 30 2507813.886 ± 17457.127 ns/op
FetchResponseBenchmark.testSerializeFetchResponse 20 1000 avgt 30 7231935.691 ± 461221.717 ns/op
```
As we expected quite a bit more time is spent during the construction of FetchRequest/FetchResponse due to conversion to existing data structures. We also see a reducing in serialization time since we no longer convert to `Struct` first.
FetchRequest total construction+serialization time is about the same before and after the change, and FetchResponse total time is slightly less after the change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455094448
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
}
}
+ private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+ Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+ fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+ Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+ .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+ new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+ fetchPartition.partitionMaxBytes(), leaderEpoch));
Review comment:
Yes, I think it's a good idea. However, it would expand the scope of this change quite a bit. I'm working on some micro benchmarks now, and if we don't have any apparent regressions then I'll save this for a follow-on PR.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-660399783
Recent test failures are due to removal of the static `parse` method on FetchRequest (it's only used via serialization in a test, so IntelliJ "usages" didn't catch it).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453856418
##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -47,33 +47,35 @@
"about": "The top level response error code." },
{ "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
"about": "The fetch session ID, or 0 if this is not part of a fetch session." },
- { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+ { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
Review comment:
s/Responses/TopicResponses?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457562415
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Sounds good.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162
##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -55,35 +55,35 @@
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
- { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
+ { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true,
Review comment:
I guess the implicit expectation is that if the protocol does not support the `read_committed` isolation level, then it wouldn't have transactional data anyway, so reverting to `read_uncommitted` is safe. Can't find a fault with that.
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str
case PRODUCE:
return new ProduceRequest(struct, apiVersion);
case FETCH:
- return new FetchRequest(struct, apiVersion);
+ return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion);
Review comment:
nit: any reason not to stick with the same constructor convention as the other requests?
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteBuffer buffer;
+ private int mark;
+
+ public RecordsWriter(String dest, int totalSize, Consumer<Send> sendConsumer) {
Review comment:
Could we rename `totalSize` so that it is clear that it does not cover the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or something like that.
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
Review comment:
nit: wonder if this should be `RecordsWritable` for consistency with `Writable`.
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+ private final ByteBuffer buf;
+
+ public RecordsReader(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public byte readByte() {
+ return buf.get();
+ }
+
+ @Override
+ public short readShort() {
+ return buf.getShort();
+ }
+
+ @Override
+ public int readInt() {
+ return buf.getInt();
+ }
+
+ @Override
+ public long readLong() {
+ return buf.getLong();
+ }
+
+ @Override
+ public double readDouble() {
+ return ByteUtils.readDouble(buf);
+ }
+
+ @Override
+ public void readArray(byte[] arr) {
+ buf.get(arr);
+ }
+
+ @Override
+ public int readUnsignedVarint() {
+ return ByteUtils.readUnsignedVarint(buf);
+ }
+
+ @Override
+ public ByteBuffer readByteBuffer(int length) {
Review comment:
More of a side question, but is this length guaranteed to be less than the buffer size? Wondering if it is worth adding range checking.
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
int throttleTimeMs,
int sessionId) {
- this.error = error;
- this.responseData = responseData;
- this.throttleTimeMs = throttleTimeMs;
- this.sessionId = sessionId;
+ this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ this.responseDataMap = responseData;
}
- public static FetchResponse<MemoryRecords> parse(Struct struct) {
- LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.get(TOPIC_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
- int partition = partitionResponseHeader.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
- long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
- long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
- long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- Optional<Integer> preferredReadReplica = Optional.of(
- partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
- ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
- BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
- if (!(baseRecords instanceof MemoryRecords))
- throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
- MemoryRecords records = (MemoryRecords) baseRecords;
-
- List<AbortedTransaction> abortedTransactions = null;
- if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
- Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
- if (abortedTransactionsArray != null) {
- abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
- for (Object abortedTransactionObj : abortedTransactionsArray) {
- Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long producerId = abortedTransactionStruct.get(PRODUCER_ID);
- long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
- abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
- }
- }
- }
-
- PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica, abortedTransactions, records);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
- struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+ public FetchResponse(FetchResponseData fetchResponseData) {
+ this.data = fetchResponseData;
+ this.responseDataMap = toResponseDataMap(fetchResponseData);
}
@Override
public Struct toStruct(short version) {
- return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ return data.toStruct(version);
}
@Override
- protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
- Struct responseHeaderStruct = responseHeader.toStruct();
- Struct responseBodyStruct = toStruct(apiVersion);
-
- // write the total size and the response header
- ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
- buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
- responseHeaderStruct.writeTo(buffer);
+ public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+ // Generate the Sends for the response fields and records
+ ArrayDeque<Send> sends = new ArrayDeque<>();
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ int totalRecordSize = data.responses().stream()
+ .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+ .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+ .sum();
+ int totalMessageSize = data.size(cache, apiVersion);
+
+ RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+ data.write(writer, cache, apiVersion);
+ writer.flush();
Review comment:
nit: not a big deal, but I feel like calling `flush` should really be the responsibility of `write`.
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
int throttleTimeMs,
int sessionId) {
- this.error = error;
- this.responseData = responseData;
- this.throttleTimeMs = throttleTimeMs;
- this.sessionId = sessionId;
+ this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ this.responseDataMap = responseData;
}
- public static FetchResponse<MemoryRecords> parse(Struct struct) {
- LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.get(TOPIC_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
- int partition = partitionResponseHeader.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
- long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
- long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
- long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- Optional<Integer> preferredReadReplica = Optional.of(
- partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
- ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
- BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
- if (!(baseRecords instanceof MemoryRecords))
- throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
- MemoryRecords records = (MemoryRecords) baseRecords;
-
- List<AbortedTransaction> abortedTransactions = null;
- if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
- Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
- if (abortedTransactionsArray != null) {
- abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
- for (Object abortedTransactionObj : abortedTransactionsArray) {
- Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long producerId = abortedTransactionStruct.get(PRODUCER_ID);
- long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
- abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
- }
- }
- }
-
- PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica, abortedTransactions, records);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
- struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+ public FetchResponse(FetchResponseData fetchResponseData) {
+ this.data = fetchResponseData;
+ this.responseDataMap = toResponseDataMap(fetchResponseData);
}
@Override
public Struct toStruct(short version) {
- return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ return data.toStruct(version);
}
@Override
- protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
- Struct responseHeaderStruct = responseHeader.toStruct();
- Struct responseBodyStruct = toStruct(apiVersion);
-
- // write the total size and the response header
- ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
- buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
- responseHeaderStruct.writeTo(buffer);
+ public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+ // Generate the Sends for the response fields and records
+ ArrayDeque<Send> sends = new ArrayDeque<>();
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ int totalRecordSize = data.responses().stream()
+ .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+ .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+ .sum();
+ int totalMessageSize = data.size(cache, apiVersion);
+
+ RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+ data.write(writer, cache, apiVersion);
+ writer.flush();
+
+ // Compute the total size of all the Sends and write it out along with the header in the first Send
+ ResponseHeaderData responseHeaderData = responseHeader.data();
+
+ int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion());
+ int bodySize = (int) sends.stream().mapToLong(Send::size).sum();
Review comment:
Instead of the cast, could we add a validation check?
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.protocol.ApiMessage;
+
public interface AbstractRequestResponse {
+ /**
+ * Return the auto-generated `Message` instance if this request/response relies on one for
+ * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+ * classes, return `null`.
+ * @return
+ */
+ default ApiMessage data() {
Review comment:
@mumrah Do we need this for this PR or can we leave this for #7409?
##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
- { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
+ { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
"about": "The top level response error code." },
{ "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
"about": "The fetch session ID, or 0 if this is not part of a fetch session." },
- { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+ { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
- { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
+ { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
"about": "The topic partitions.", "fields": [
- { "name": "PartitionIndex", "type": "int32", "versions": "0+",
- "about": "The partiiton index." },
- { "name": "ErrorCode", "type": "int16", "versions": "0+",
- "about": "The error code, or 0 if there was no fetch error." },
- { "name": "HighWatermark", "type": "int64", "versions": "0+",
- "about": "The current high water mark." },
- { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
- "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
- { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
- "about": "The current log start offset." },
- { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
- "about": "The aborted transactions.", "fields": [
- { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
- "about": "The producer id associated with the aborted transaction." },
- { "name": "FirstOffset", "type": "int64", "versions": "4+",
- "about": "The first offset in the aborted transaction." }
+ { "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
+ "fields": [
+ { "name": "Partition", "type": "int32", "versions": "0+",
+ "about": "The partition index." },
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The error code, or 0 if there was no fetch error." },
+ { "name": "HighWatermark", "type": "int64", "versions": "0+",
+ "about": "The current high water mark." },
+ { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
+ "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
+ { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
+ "about": "The current log start offset." },
+ { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
+ "about": "The aborted transactions.", "fields": [
+ { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
+ "about": "The producer id associated with the aborted transaction." },
+ { "name": "FirstOffset", "type": "int64", "versions": "4+",
+ "about": "The first offset in the aborted transaction." }
+ ]},
+ { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true,
Review comment:
I'm wondering if this should be ignorable. When this is set, the leader returns no data, so it relies crucially on the follower redirection.
##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
- { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
+ { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
"about": "The top level response error code." },
{ "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
"about": "The fetch session ID, or 0 if this is not part of a fetch session." },
- { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+ { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
- { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
+ { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
"about": "The topic partitions.", "fields": [
- { "name": "PartitionIndex", "type": "int32", "versions": "0+",
- "about": "The partiiton index." },
- { "name": "ErrorCode", "type": "int16", "versions": "0+",
- "about": "The error code, or 0 if there was no fetch error." },
- { "name": "HighWatermark", "type": "int64", "versions": "0+",
- "about": "The current high water mark." },
- { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
- "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
- { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
- "about": "The current log start offset." },
- { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
- "about": "The aborted transactions.", "fields": [
- { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
- "about": "The producer id associated with the aborted transaction." },
- { "name": "FirstOffset", "type": "int64", "versions": "4+",
- "about": "The first offset in the aborted transaction." }
+ { "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
+ "fields": [
+ { "name": "Partition", "type": "int32", "versions": "0+",
+ "about": "The partition index." },
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The error code, or 0 if there was no fetch error." },
+ { "name": "HighWatermark", "type": "int64", "versions": "0+",
+ "about": "The current high water mark." },
+ { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
+ "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
+ { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
+ "about": "The current log start offset." },
+ { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
+ "about": "The aborted transactions.", "fields": [
+ { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
+ "about": "The producer id associated with the aborted transaction." },
+ { "name": "FirstOffset", "type": "int64", "versions": "4+",
+ "about": "The first offset in the aborted transaction." }
+ ]},
+ { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true,
Review comment:
I'm wondering if this should be ignorable. When this is set, the leader returns no data, so it relies crucially on the follower redirecting.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576
Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included.
On this branch:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer 3 10 avgt 15 2110.741 ± 27.935 ns/op
FetchRequestBenchmark.testFetchRequestForReplica 3 10 avgt 15 2021.114 ± 7.816 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer 3 10 avgt 15 3452.799 ± 16.013 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica 3 10 avgt 15 3691.157 ± 60.260 ns/op
GC Profile (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 4295.532 ± 56.061 MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 9984.000 ± 0.001 B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 4292.525 ± 56.341 MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 9977.037 ± 28.311 B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.187 ± 0.027 MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.435 ± 0.060 B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 3 10 avgt 15 2335.000 counts
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time 3 10 avgt 15 1375.000 ms
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 4416.855 ± 16.429 MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 9832.000 ± 0.001 B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 4417.032 ± 24.858 MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 9832.358 ± 28.932 B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.186 ± 0.015 MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.415 ± 0.033 B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 3 10 avgt 15 2280.000 counts
FetchRequestBenchmark.testFetchRequestForReplica:·gc.time 3 10 avgt 15 1376.000 ms
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 3256.172 ± 15.524 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 12384.000 ± 0.001 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 3255.019 ± 21.484 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 12379.587 ± 49.161 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.122 ± 0.022 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.462 ± 0.084 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count 3 10 avgt 15 2054.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time 3 10 avgt 15 1389.000 ms
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 3319.965 ± 53.427 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 13496.000 ± 0.001 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 3320.125 ± 52.812 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 13496.813 ± 64.774 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.126 ± 0.021 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.512 ± 0.085 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count 3 10 avgt 15 2122.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time 3 10 avgt 15 1395.000 ms
```
On trunk:
```
Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer 3 10 avgt 15 3.457 ± 0.016 ns/op
FetchRequestBenchmark.testFetchRequestForReplica 3 10 avgt 15 3.453 ± 0.035 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer 3 10 avgt 15 13214.306 ± 61.158 ns/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica 3 10 avgt 15 13147.870 ± 52.318 ns/op
GC Profile (partitionCount) (topicCount) Mode Cnt Score Error Units
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 ≈ 10⁻⁴ MB/sec
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 ≈ 10⁻⁶ B/op
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 3 10 avgt 15 ≈ 0 counts
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 ≈ 10⁻⁴ MB/sec
FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 ≈ 10⁻⁶ B/op
FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 3 10 avgt 15 ≈ 0 counts
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 1795.576 ± 8.351 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 26136.002 ± 0.005 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 1796.108 ± 11.527 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 26143.702 ± 100.832 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.163 ± 0.019 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 2.366 ± 0.270 B/op
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count 3 10 avgt 15 2134.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time 3 10 avgt 15 1412.000 ms
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 1804.695 ± 7.193 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 26136.002 ± 0.005 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 1805.666 ± 7.990 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 26150.127 ± 86.455 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.166 ± 0.016 MB/sec
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 2.406 ± 0.238 B/op
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count 3 10 avgt 15 2097.000 counts
FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time 3 10 avgt 15 1395.000 ms
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947
I ran a console consumer perf test (at @hachikuji's suggestion) and took a profile.
![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png)
Zoomed in a bit on the records part:
![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png)
This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #9008:
URL: https://github.com/apache/kafka/pull/9008
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-661049255
retest this please
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-656906624
I agree that some benchmarks would be useful. One of the key differences is how the `MultiRecordsSend` gets constructed, so that is probably one thing. Potentially we are faster here because we do not have the conversion to `Struct`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457546281
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Thanks for the explanation, @ijuma. I missed the semantics of `newLength`
After the initial few top-level fields, each partition will have something like 38 bytes preceding its records. Maybe we could increase initial capacity to 64 bytes?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456196331
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
##########
@@ -35,6 +36,10 @@
int readUnsignedVarint();
ByteBuffer readByteBuffer(int length);
+ default BaseRecords readRecords(int length) {
+ throw new UnsupportedOperationException("Not implemented");
+ }
Review comment:
Is this and the respective `writeRecords` in the base interface needed? It seems like they're only implemented in two specific cases. Could we not downcast for those cases?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659673793
> Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included.
>
> On this branch:
>
> ```
> Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
> FetchRequestBenchmark.testFetchRequestForConsumer 3 10 avgt 15 2110.741 ± 27.935 ns/op
> FetchRequestBenchmark.testFetchRequestForReplica 3 10 avgt 15 2021.114 ± 7.816 ns/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer 3 10 avgt 15 3452.799 ± 16.013 ns/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica 3 10 avgt 15 3691.157 ± 60.260 ns/op
>
> GC Profile (partitionCount) (topicCount) Mode Cnt Score Error Units
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 4295.532 ± 56.061 MB/sec
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 9984.000 ± 0.001 B/op
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 4292.525 ± 56.341 MB/sec
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 9977.037 ± 28.311 B/op
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.187 ± 0.027 MB/sec
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.435 ± 0.060 B/op
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 3 10 avgt 15 2335.000 counts
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time 3 10 avgt 15 1375.000 ms
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 4416.855 ± 16.429 MB/sec
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 9832.000 ± 0.001 B/op
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 4417.032 ± 24.858 MB/sec
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 9832.358 ± 28.932 B/op
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.186 ± 0.015 MB/sec
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.415 ± 0.033 B/op
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 3 10 avgt 15 2280.000 counts
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.time 3 10 avgt 15 1376.000 ms
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 3256.172 ± 15.524 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 12384.000 ± 0.001 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 3255.019 ± 21.484 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 12379.587 ± 49.161 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.122 ± 0.022 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.462 ± 0.084 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count 3 10 avgt 15 2054.000 counts
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time 3 10 avgt 15 1389.000 ms
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 3319.965 ± 53.427 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 13496.000 ± 0.001 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 3320.125 ± 52.812 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 13496.813 ± 64.774 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.126 ± 0.021 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 0.512 ± 0.085 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count 3 10 avgt 15 2122.000 counts
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time 3 10 avgt 15 1395.000 ms
> ```
>
> On trunk:
>
> ```
> Benchmark (partitionCount) (topicCount) Mode Cnt Score Error Units
> FetchRequestBenchmark.testFetchRequestForConsumer 3 10 avgt 15 3.457 ± 0.016 ns/op
> FetchRequestBenchmark.testFetchRequestForReplica 3 10 avgt 15 3.453 ± 0.035 ns/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer 3 10 avgt 15 13214.306 ± 61.158 ns/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica 3 10 avgt 15 13147.870 ± 52.318 ns/op
>
> GC Profile (partitionCount) (topicCount) Mode Cnt Score Error Units
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 ≈ 10⁻⁴ MB/sec
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 ≈ 10⁻⁶ B/op
> FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 3 10 avgt 15 ≈ 0 counts
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 ≈ 10⁻⁴ MB/sec
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 ≈ 10⁻⁶ B/op
> FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 3 10 avgt 15 ≈ 0 counts
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 3 10 avgt 15 1795.576 ± 8.351 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 3 10 avgt 15 26136.002 ± 0.005 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 3 10 avgt 15 1796.108 ± 11.527 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 26143.702 ± 100.832 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.163 ± 0.019 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 2.366 ± 0.270 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count 3 10 avgt 15 2134.000 counts
> FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time 3 10 avgt 15 1412.000 ms
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate 3 10 avgt 15 1804.695 ± 7.193 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm 3 10 avgt 15 26136.002 ± 0.005 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space 3 10 avgt 15 1805.666 ± 7.990 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 3 10 avgt 15 26150.127 ± 86.455 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space 3 10 avgt 15 0.166 ± 0.016 MB/sec
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 3 10 avgt 15 2.406 ± 0.238 B/op
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count 3 10 avgt 15 2097.000 counts
> FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time 3 10 avgt 15 1395.000 ms
> ```
Nice, so roughly for the replica fetch:
```
2021.114 + 3691.157 = 5712.271 ns
vs
0.035 + 13147.870 = 13147.905 ns
```
57% reduction in CPU time.
Alloc rate normalized comparison:
```
9984.000 + 9832.000 = 19816 B/op
vs
26136.002 B/op
```
24.18% reduction in garbage generation.
I think the garbage generation will massively improve once we can get rid of toPartitionDataMap later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456518529
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Yea, it's possible, but rather complicated I think. We would need to manage our own byte array and grow it on-demand (like what happens in ByteArrayOutputStream). Then we could use `ByteBuffer#slice` to pass views of this array to the ByteBufferSend objects. I don't think this current approach is any worse than before in terms of array allocations, so maybe we could save this for a future optimization?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455828890
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
}
}
+ private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+ Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+ fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+ Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+ .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+ new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+ fetchPartition.partitionMaxBytes(), leaderEpoch));
+ }));
+ return Collections.unmodifiableMap(result);
+ }
+
+ private List<TopicPartition> toForgottonTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics) {
Review comment:
Typo "Forgotton"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455407043
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
}
}
+ private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+ Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+ fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+ Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+ .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+ new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+ fetchPartition.partitionMaxBytes(), leaderEpoch));
Review comment:
As an aside, it would be awesome to add `Optional` support to the generated classes. We have had so many bugs which were caused by sentinel values sneaking into unexpected places.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456197321
##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
field.camelCaseName(), field.camelCaseName());
}
+ } else if (field.type().isRecords()) {
+ // TODO is this valid for record instances?
Review comment:
The hashCode of `MemoryRecords` takes into account the buffer position, so it's kind of useless. `FileRecords` doesn't even define it. We should consider defining the hashCode and equals of `Records` to be identity based.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457546281
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
Thanks for the explanation, @ijuma. I missed the semantics of `newLength`
After the initial few top-level fields, each partition will have something like 38 bytes preceding its records (at a minimum, aborted transactions could increase that). Maybe we could increase initial capacity to 64 bytes?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455318129
##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+ @Param({"1000"})
+ private int topicCount;
+
+ @Param({"20"})
+ private int partitionCount;
+
+ Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+ RequestHeader header;
+
+ FetchRequest request;
+
+
+ @Setup(Level.Trial)
+ public void setup() {
+ this.fetchData = new HashMap<>();
+ for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+ for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+ FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+ 0, 0, 4096, Optional.empty());
+ fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+ }
+ }
+
+ this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+ this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());
Review comment:
Can we please have benchmarks for both forConsumer and forReplica fetch requests?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-666531948
These test failures are known flaky tests which already have jira tickets
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-662794475
Thanks @cmccabe, great feedback. I've updated RecordsWriter to allocate a single ByteBuffer based on a pre-calculated length (total message size - all records size). This avoids the buffer resizing altogether.
I like your suggestions for Writable#close and moving readRecords into ByteBufferAccessor. I'll save these for a follow-on
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
int throttleTimeMs,
int sessionId) {
- this.error = error;
- this.responseData = responseData;
- this.throttleTimeMs = throttleTimeMs;
- this.sessionId = sessionId;
+ this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ this.responseDataMap = responseData;
}
- public static FetchResponse<MemoryRecords> parse(Struct struct) {
- LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.get(TOPIC_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
- int partition = partitionResponseHeader.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
- long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
- long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
- long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- Optional<Integer> preferredReadReplica = Optional.of(
- partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
- ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
- BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
- if (!(baseRecords instanceof MemoryRecords))
- throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
- MemoryRecords records = (MemoryRecords) baseRecords;
-
- List<AbortedTransaction> abortedTransactions = null;
- if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
- Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
- if (abortedTransactionsArray != null) {
- abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
- for (Object abortedTransactionObj : abortedTransactionsArray) {
- Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long producerId = abortedTransactionStruct.get(PRODUCER_ID);
- long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
- abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
- }
- }
- }
-
- PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica, abortedTransactions, records);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
- struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+ public FetchResponse(FetchResponseData fetchResponseData) {
+ this.data = fetchResponseData;
+ this.responseDataMap = toResponseDataMap(fetchResponseData);
}
@Override
public Struct toStruct(short version) {
- return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ return data.toStruct(version);
}
@Override
- protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
- Struct responseHeaderStruct = responseHeader.toStruct();
- Struct responseBodyStruct = toStruct(apiVersion);
-
- // write the total size and the response header
- ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
- buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
- responseHeaderStruct.writeTo(buffer);
+ public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+ // Generate the Sends for the response fields and records
+ ArrayDeque<Send> sends = new ArrayDeque<>();
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ int totalRecordSize = data.responses().stream()
+ .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+ .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+ .sum();
+ int totalMessageSize = data.size(cache, apiVersion);
+
+ RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+ data.write(writer, cache, apiVersion);
+ writer.flush();
Review comment:
Yea, I agree. @cmccabe had a suggestion about adding `Writable#close` which would achieve the same goal. I think this would be nice and clean things up a bit. I'll open a follow up PR for this
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
int throttleTimeMs,
int sessionId) {
- this.error = error;
- this.responseData = responseData;
- this.throttleTimeMs = throttleTimeMs;
- this.sessionId = sessionId;
+ this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ this.responseDataMap = responseData;
}
- public static FetchResponse<MemoryRecords> parse(Struct struct) {
- LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
- for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
- Struct topicResponse = (Struct) topicResponseObj;
- String topic = topicResponse.get(TOPIC_NAME);
- for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
- Struct partitionResponse = (Struct) partitionResponseObj;
- Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
- int partition = partitionResponseHeader.get(PARTITION_ID);
- Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
- long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
- long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
- long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
- Optional<Integer> preferredReadReplica = Optional.of(
- partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
- ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
- BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
- if (!(baseRecords instanceof MemoryRecords))
- throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
- MemoryRecords records = (MemoryRecords) baseRecords;
-
- List<AbortedTransaction> abortedTransactions = null;
- if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
- Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
- if (abortedTransactionsArray != null) {
- abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
- for (Object abortedTransactionObj : abortedTransactionsArray) {
- Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
- long producerId = abortedTransactionStruct.get(PRODUCER_ID);
- long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
- abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
- }
- }
- }
-
- PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
- logStartOffset, preferredReadReplica, abortedTransactions, records);
- responseData.put(new TopicPartition(topic, partition), partitionData);
- }
- }
- return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
- struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+ public FetchResponse(FetchResponseData fetchResponseData) {
+ this.data = fetchResponseData;
+ this.responseDataMap = toResponseDataMap(fetchResponseData);
}
@Override
public Struct toStruct(short version) {
- return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+ return data.toStruct(version);
}
@Override
- protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
- Struct responseHeaderStruct = responseHeader.toStruct();
- Struct responseBodyStruct = toStruct(apiVersion);
-
- // write the total size and the response header
- ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
- buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
- responseHeaderStruct.writeTo(buffer);
+ public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+ // Generate the Sends for the response fields and records
+ ArrayDeque<Send> sends = new ArrayDeque<>();
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ int totalRecordSize = data.responses().stream()
+ .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+ .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+ .sum();
+ int totalMessageSize = data.size(cache, apiVersion);
+
+ RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+ data.write(writer, cache, apiVersion);
+ writer.flush();
+
+ // Compute the total size of all the Sends and write it out along with the header in the first Send
+ ResponseHeaderData responseHeaderData = responseHeader.data();
+
+ int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion());
+ int bodySize = (int) sends.stream().mapToLong(Send::size).sum();
Review comment:
Do you mean something like `Math.toIntExact`?
##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str
case PRODUCE:
return new ProduceRequest(struct, apiVersion);
case FETCH:
- return new FetchRequest(struct, apiVersion);
+ return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion);
Review comment:
I just wanted to remove the Struct constructor of FetchRequest completely. Eventually, `RequestContext#parseRequest(ByteBuffer)` will stop using Structs and pass the message data classes to `AbstractRequest#parseRequest` (or similar).
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+ private final ByteBuffer buf;
+
+ public RecordsReader(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public byte readByte() {
+ return buf.get();
+ }
+
+ @Override
+ public short readShort() {
+ return buf.getShort();
+ }
+
+ @Override
+ public int readInt() {
+ return buf.getInt();
+ }
+
+ @Override
+ public long readLong() {
+ return buf.getLong();
+ }
+
+ @Override
+ public double readDouble() {
+ return ByteUtils.readDouble(buf);
+ }
+
+ @Override
+ public void readArray(byte[] arr) {
+ buf.get(arr);
+ }
+
+ @Override
+ public int readUnsignedVarint() {
+ return ByteUtils.readUnsignedVarint(buf);
+ }
+
+ @Override
+ public ByteBuffer readByteBuffer(int length) {
Review comment:
This is copied straight from ByteBufferAccessor and will probably go away in a follow-on PR. But either way, looking at it it seems it should always be in range since this is used by zero-copy byte fields in the message classes, e.g.
```
int len = _reader.readInt();
if (len > 0) {
this.someZeroCopyField = _reader.readByteBuffer(len);
}
```
So generally it's probably safe. In the case of a corrupt message where the length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It probably would be nice to put a range check in ByteBufferAccessor so we can throw a more useful error.
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+ private final ByteBuffer buf;
+
+ public RecordsReader(ByteBuffer buf) {
+ this.buf = buf;
+ }
+
+ @Override
+ public byte readByte() {
+ return buf.get();
+ }
+
+ @Override
+ public short readShort() {
+ return buf.getShort();
+ }
+
+ @Override
+ public int readInt() {
+ return buf.getInt();
+ }
+
+ @Override
+ public long readLong() {
+ return buf.getLong();
+ }
+
+ @Override
+ public double readDouble() {
+ return ByteUtils.readDouble(buf);
+ }
+
+ @Override
+ public void readArray(byte[] arr) {
+ buf.get(arr);
+ }
+
+ @Override
+ public int readUnsignedVarint() {
+ return ByteUtils.readUnsignedVarint(buf);
+ }
+
+ @Override
+ public ByteBuffer readByteBuffer(int length) {
Review comment:
This is copied straight from ByteBufferAccessor and will probably go away in a follow-on PR. But either way, looking at it it seems it should always be in range since this is used by zero-copy byte fields in the message classes, e.g.
```java
int len = _reader.readInt();
if (len > 0) {
this.someZeroCopyField = _reader.readByteBuffer(len);
}
```
So generally it's probably safe. In the case of a corrupt message where the length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It probably would be nice to put a range check in ByteBufferAccessor so we can throw a more useful error.
##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
- { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
+ { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
"about": "The top level response error code." },
{ "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
"about": "The fetch session ID, or 0 if this is not part of a fetch session." },
- { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+ { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
"about": "The response topics.", "fields": [
- { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+ { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
"about": "The topic name." },
- { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
+ { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
"about": "The topic partitions.", "fields": [
- { "name": "PartitionIndex", "type": "int32", "versions": "0+",
- "about": "The partiiton index." },
- { "name": "ErrorCode", "type": "int16", "versions": "0+",
- "about": "The error code, or 0 if there was no fetch error." },
- { "name": "HighWatermark", "type": "int64", "versions": "0+",
- "about": "The current high water mark." },
- { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
- "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
- { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
- "about": "The current log start offset." },
- { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
- "about": "The aborted transactions.", "fields": [
- { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
- "about": "The producer id associated with the aborted transaction." },
- { "name": "FirstOffset", "type": "int64", "versions": "4+",
- "about": "The first offset in the aborted transaction." }
+ { "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
+ "fields": [
+ { "name": "Partition", "type": "int32", "versions": "0+",
+ "about": "The partition index." },
+ { "name": "ErrorCode", "type": "int16", "versions": "0+",
+ "about": "The error code, or 0 if there was no fetch error." },
+ { "name": "HighWatermark", "type": "int64", "versions": "0+",
+ "about": "The current high water mark." },
+ { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
+ "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
+ { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
+ "about": "The current log start offset." },
+ { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
+ "about": "The aborted transactions.", "fields": [
+ { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
+ "about": "The producer id associated with the aborted transaction." },
+ { "name": "FirstOffset", "type": "int64", "versions": "4+",
+ "about": "The first offset in the aborted transaction." }
+ ]},
+ { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true,
Review comment:
I see what you mean. If we have a bug that causes us to hit the preferred replica code for an older api version, we should fail to serialize the message rather than sending it to a client that doesn't understand follower redirection.
Good catch.
##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -55,35 +55,35 @@
"about": "The minimum bytes to accumulate in the response." },
{ "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
"about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." },
- { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
+ { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true,
Review comment:
I changed this to make the JSON schema match what was previously in FetchRequest.java. During serialization, we would simply stick the isolation level in the Struct regardless of the api version:
```java
struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id());
```
So even if we were writing out a v3 FetchRequest, whatever value we put here would be ignored and not sent out. There were also some unit tests that utilized this behavior.
Your assessment sounds correct though, so it probably doesn't matter whether it's ignorable or not.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457496688
##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+ private final String dest;
+ private final Consumer<Send> sendConsumer;
+ private final ByteArrayOutputStream byteArrayOutputStream;
+ private final DataOutput output;
+
+ public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+ this.dest = dest;
+ this.sendConsumer = sendConsumer;
+ this.byteArrayOutputStream = new ByteArrayOutputStream();
+ this.output = new DataOutputStream(this.byteArrayOutputStream);
+ }
+
+ @Override
+ public void writeByte(byte val) {
+ writeQuietly(() -> output.writeByte(val));
+ }
+
+ @Override
+ public void writeShort(short val) {
+ writeQuietly(() -> output.writeShort(val));
+ }
+
+ @Override
+ public void writeInt(int val) {
+ writeQuietly(() -> output.writeInt(val));
+ }
+
+ @Override
+ public void writeLong(long val) {
+ writeQuietly(() -> output.writeLong(val));
+
+ }
+
+ @Override
+ public void writeDouble(double val) {
+ writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+ }
+
+ @Override
+ public void writeByteArray(byte[] arr) {
+ writeQuietly(() -> output.write(arr));
+ }
+
+ @Override
+ public void writeUnsignedVarint(int i) {
+ writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+ }
+
+ @Override
+ public void writeByteBuffer(ByteBuffer src) {
+ writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+ }
+
+ @FunctionalInterface
+ private interface IOExceptionThrowingRunnable {
+ void run() throws IOException;
+ }
+
+ private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+ try {
+ runnable.run();
+ } catch (IOException e) {
+ throw new RuntimeException("Writable encountered an IO error", e);
+ }
+ }
+
+ @Override
+ public void writeRecords(BaseRecords records) {
+ flush();
+ sendConsumer.accept(records.toSend(dest));
+ }
+
+ /**
+ * Flush any pending bytes as a ByteBufferSend and reset the buffer
+ */
+ public void flush() {
+ ByteBufferSend send = new ByteBufferSend(dest,
+ ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
Review comment:
@mumrah Thanks for checking this. However, the behavior in JDK 14 has not changed in that way. Performance would be atrocious if it did:
```java
private void ensureCapacity(int minCapacity) {
// overflow-conscious code
int oldCapacity = buf.length;
int minGrowth = minCapacity - oldCapacity;
if (minGrowth > 0) {
buf = Arrays.copyOf(buf, ArraysSupport.newLength(oldCapacity,
minGrowth, oldCapacity /* preferred growth */));
}
```
The third parameter passed to `newLength` is the preferred growth, which is `oldCapacity`. That is, it doubles if it doesn't cause overflow. We should probably double for `ByteBufferOutputStream` too _if_ we have no estimate of the expected size. `1.1` growth makes sense if we do have a reasonable estimate (which is the case in current usage, I believe, but perhaps not in this case).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org