You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/07/10 17:50:59 UTC

[GitHub] [kafka] mumrah opened a new pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

mumrah opened a new pull request #9008:
URL: https://github.com/apache/kafka/pull/9008


   This change makes use of the generated protocols for FetchRequest and FetchResponse. The main challenge here was how to allow the transferrable bytes of the record set to be directly sent to the outgoing response without copying into a buffer. 
   
   The proposed solution is similar to the existing multi-send object used in [FetchResponse](TODO). However, a new writer class [RecordsWriter](TODO) was introduced to allow interleaving of ByteBufferSend (for headers and other non-record fields) along with RecordsSend-s which implement the efficient byte transfer.
   
   Another change introduced here is that FetchRequest and FetchResponse do not maintain their own copies of the fields from the message. Instead, they hold a reference to the generated message class (FetchRequestData and FetchResponseData). Read-only copies of different forms of the message data are created once open construction to allow for efficient access using the existing class methods.
   
   For example, in FetchRequest we hold the FetchRequestData, but also compute and hold:
   
   ```java
       private final FetchRequestData fetchRequestData;
   
       // These are immutable read-only structures derived from FetchRequestData
       private final Map<TopicPartition, PartitionData> fetchData;
       private final List<TopicPartition> toForget;
       private final FetchMetadata metadata;
   ``` 
   
   And in FetchResponse, we similarly hold:
   
   ```java
       private final FetchResponseData fetchResponseData;
       private final LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap;
   ```
   
   If we want, we could deprecate all the accessors on FetchRequest/FetchResponse and force callers to use the `#data()` method. This would eliminate the need for these additional data structures.
   
   Finally, most of the other changes are fixing up tests that were actually using invalid default values for protocol messages (which are now enforced, thanks to the generated classes) as well as rectifying the JSON schema to match what the actual defined `Schema`s were (e.g., FETCH_RESPONSE_V11)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456224028



##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            // TODO is this valid for record instances?

Review comment:
       @mumrah : equality for the generated messages should mean bytewise equality.  So if two FetchResponseData instances contain the same data, they should be equal, even if one is using MemoryRecords and the other is using FileRecords.  Same for hashCode, of course.
   
   If it's too much trouble to change the Records class, you can just write a static utility method in MessageUtils and invoke it from the generated classes.  I expect that we won't be doing this kind of comparison except in tests, so you don't need to optimize the method too much.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r454664545



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {

Review comment:
       That's fair. I'm ok to leave it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456692033



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Would `org.apache.kafka.common.utils.ByteBufferOutputStream` be useful here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r458382935



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       I increased the initial buffer size to 64 and also added 2x growth factor for the buffer. It occurred to me the initial size only really helps for the first partition's header fields, but beyond that (since we are reusing/growing the same ByteBufferOutputStream) we don't know what we'll need. The JMH benchmark did confirm that 2x was more performant than 1.1x for FetchResponse.
   
   Existing usages of ByteBufferOutputStream were not modified and still use 1.1x




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457416269



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteBufferOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+    private int mark;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+        this.mark = 0;
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        try {
+            output.writeByte(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeShort(short val) {
+        try {
+            output.writeShort(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeInt(int val) {
+        try {
+            output.writeInt(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeLong(long val) {
+        try {
+            output.writeLong(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        try {
+            ByteUtils.writeDouble(val, output);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        try {
+            output.write(arr);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        try {
+            ByteUtils.writeUnsignedVarint(i, output);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        try {
+            output.write(src.array(), src.position(), src.remaining());
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBuffer buf = byteArrayOutputStream.buffer();
+        int end = buf.position();
+        int len = end - mark;
+
+        if (len > 0) {
+            buf.position(mark);
+            ByteBuffer slice = buf.slice();
+            slice.limit(len);
+            ByteBufferSend send = new ByteBufferSend(dest, slice);
+            sendConsumer.accept(send);
+        }
+
+        buf.position(end);
+        mark = end;

Review comment:
       I ended up having to keep a separate mark here since ByteBufferOutputStream doesn't keep the mark when it replaces the underlying buffer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+       Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
       Let's open a jira for getting rid of the toPartitionDataMap if we don't address it in this PR. It's a pretty large part of the cost here and there are only a few places we would have to deal with it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456539488



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));

Review comment:
       This would only get called when we're writing a int8 field I believe, but you're right these lambdas could create un-needed object allocations. I'll rewrite as simple try/catch statements




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453849419



##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            // TODO is this valid for record instances?

Review comment:
       No I don't think they are designed to be compared. My main question was whether we can compare the same type (MemoryRecords to MemoryRecords). I think it should work in the case of `Objects.equals` since it first checks if the instances are the same. I don't think we have any use cases where we have equivalent instances of records that are actual separate objects. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-658955696


   > Added some basic jmh benchmarks. Here are the preliminary results (run on my laptop, so take with a grain of salt). All these tests are using 1000 topics with 20 partitions each. For FetchResponse, I used static MemoryRecords rather than FileRecords to try and better isolate the serialization time.
   > 
   > On `trunk`:
   > 
   > ```
   > Benchmark                                        (partitionCount)  (topicCount)  Mode  Cnt        Score        Error  Units
   > FetchRequestBenchmark.testConstructFetchRequest                20          1000  avgt   30         3.591 ±      0.046  ns/op
   > FetchRequestBenchmark.testSerializeFetchRequest                20          1000  avgt   30  10049872.274 ± 440324.738  ns/op
   > FetchResponseBenchmark.testConstructFetchResponse              20          1000  avgt   30         1.911 ±      0.018  ns/op
   > FetchResponseBenchmark.testSerializeFetchResponse              20          1000  avgt   30  13693835.230 ± 150935.356  ns/op
   > ```
   > 
   > On this branch:
   > 
   > ```
   > Benchmark                                        (partitionCount)  (topicCount)  Mode  Cnt        Score        Error  Units
   > FetchRequestBenchmark.testConstructFetchRequest                20          1000  avgt   30  4809813.661 ± 190773.702  ns/op
   > FetchRequestBenchmark.testSerializeFetchRequest                20          1000  avgt   30  4646758.697 ± 551449.969  ns/op
   > FetchResponseBenchmark.testConstructFetchResponse              20          1000  avgt   30  2507813.886 ±  17457.127  ns/op
   > FetchResponseBenchmark.testSerializeFetchResponse              20          1000  avgt   30  7231935.691 ± 461221.717  ns/op
   > ```
   > 
   > As we expected quite a bit more time is spent during the construction of FetchRequest/FetchResponse due to conversion to existing data structures. We also see a reducing in serialization time since we no longer convert to `Struct` first.
   > 
   > FetchRequest total construction+serialization time is about the same before and after the change, and FetchResponse total time is slightly less after the change.
   
   Nice improvement! Could you please rerun them both with `./jmh.sh -prof gc`? We should make sure that we are not increasing our garbage generation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576


   Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included.
   
   On this branch: 
   ```
   Benchmark                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score    Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer                                                           3            10  avgt   15   2110.741 ± 27.935   ns/op
   FetchRequestBenchmark.testFetchRequestForReplica                                                            3            10  avgt   15   2021.114 ±  7.816   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer                                                  3            10  avgt   15   3452.799 ± 16.013   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica                                                   3            10  avgt   15   3691.157 ± 60.260   ns/op
   
   GC Profile                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score    Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate                                            3            10  avgt   15   4295.532 ± 56.061  MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm                                       3            10  avgt   15   9984.000 ±  0.001    B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space                                   3            10  avgt   15   4292.525 ± 56.341  MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                              3            10  avgt   15   9977.037 ± 28.311    B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                               3            10  avgt   15      0.187 ±  0.027  MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                          3            10  avgt   15      0.435 ±  0.060    B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count                                                 3            10  avgt   15   2335.000           counts
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time                                                  3            10  avgt   15   1375.000               ms
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate                                             3            10  avgt   15   4416.855 ± 16.429  MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm                                        3            10  avgt   15   9832.000 ±  0.001    B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space                                    3            10  avgt   15   4417.032 ± 24.858  MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                               3            10  avgt   15   9832.358 ± 28.932    B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space                                3            10  avgt   15      0.186 ±  0.015  MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                           3            10  avgt   15      0.415 ±  0.033    B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.count                                                  3            10  avgt   15   2280.000           counts
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.time                                                   3            10  avgt   15   1376.000               ms
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate                                   3            10  avgt   15   3256.172 ± 15.524  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm                              3            10  avgt   15  12384.000 ±  0.001    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space                          3            10  avgt   15   3255.019 ± 21.484  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                     3            10  avgt   15  12379.587 ± 49.161    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                      3            10  avgt   15      0.122 ±  0.022  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                 3            10  avgt   15      0.462 ±  0.084    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count                                        3            10  avgt   15   2054.000           counts
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time                                         3            10  avgt   15   1389.000               ms
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate                                    3            10  avgt   15   3319.965 ± 53.427  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm                               3            10  avgt   15  13496.000 ±  0.001    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space                           3            10  avgt   15   3320.125 ± 52.812  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                      3            10  avgt   15  13496.813 ± 64.774    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space                       3            10  avgt   15      0.126 ±  0.021  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                  3            10  avgt   15      0.512 ±  0.085    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count                                         3            10  avgt   15   2122.000           counts
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time     
   ```
   
   On trunk:
   ```
   Benchmark                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score     Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer                                                           3            10  avgt   15      3.457 ±   0.016   ns/op
   FetchRequestBenchmark.testFetchRequestForReplica                                                            3            10  avgt   15      3.453 ±   0.035   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer                                                  3            10  avgt   15  13214.306 ±  61.158   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica                                                   3            10  avgt   15  13147.870 ±  52.318   ns/op
   
   GC Profile                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score     Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate                                            3            10  avgt   15     ≈ 10⁻⁴            MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm                                       3            10  avgt   15     ≈ 10⁻⁶              B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count                                                 3            10  avgt   15        ≈ 0            counts
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate                                             3            10  avgt   15     ≈ 10⁻⁴            MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm                                        3            10  avgt   15     ≈ 10⁻⁶              B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.count                                                  3            10  avgt   15        ≈ 0            counts
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate                                   3            10  avgt   15   1795.576 ±   8.351  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm                              3            10  avgt   15  26136.002 ±   0.005    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space                          3            10  avgt   15   1796.108 ±  11.527  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                     3            10  avgt   15  26143.702 ± 100.832    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                      3            10  avgt   15      0.163 ±   0.019  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                 3            10  avgt   15      2.366 ±   0.270    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count                                        3            10  avgt   15   2134.000            counts
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time                                         3            10  avgt   15   1412.000                ms
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate                                    3            10  avgt   15   1804.695 ±   7.193  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm                               3            10  avgt   15  26136.002 ±   0.005    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space                           3            10  avgt   15   1805.666 ±   7.990  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                      3            10  avgt   15  26150.127 ±  86.455    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space                       3            10  avgt   15      0.166 ±   0.016  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                  3            10  avgt   15      2.406 ±   0.238    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count                                         3            10  avgt   15   2097.000            counts
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time     
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456662850



##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2397,6 +2460,9 @@ private String fieldDefault(FieldSpec field) {
                 headerGenerator.addImport(MessageGenerator.BYTES_CLASS);
                 return "Bytes.EMPTY";
             }
+        } else if (field.type().isRecords()) {
+            // TODO should we use some special EmptyRecords class instead?

Review comment:
       Using null as the default seems reasonable to me. We also have `MemoryRecords.EMPTY` which we could use.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456692033



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Would `ByteBufferOutputStream` be useful here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457496688



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       @mumrah Thanks for checking this. However, the behavior in JDK 14 has not changed in that way. Performance would be atrocious if it did:
   
   ```java
   private void ensureCapacity(int minCapacity) {
           // overflow-conscious code
           int oldCapacity = buf.length;
           int minGrowth = minCapacity - oldCapacity;
           if (minGrowth > 0) {
               buf = Arrays.copyOf(buf, ArraysSupport.newLength(oldCapacity,
                       minGrowth, oldCapacity /* preferred growth */));
           }
   ```
   
   The third parameter passed to `newLength` is the preferred growth, which is `oldCapacity`. That is, it doubles if it doesn't cause overflow. We should probably double for `ByteBufferOutputStream` too _if_ we have no estimate of the expected size. `1.1` growth makes sense if we do have a reasonable estimate (which is the case in current usage, I believe).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r454866708



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+       Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
       @mumrah Have we considered dropping the `PartitionData` class entirely in favour of using `FetchRequestData .FetchPartition` directly in the broker? The main difference is that `FetchPartition` does not have an `Optional` for the leader epoch but returns the default value (-1) instead. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453846799



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {

Review comment:
       Are you suggesting a combined records reader+writer? ByteBufferAccessor is both Readable and Writable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455518434



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+    @Param({"1000"})
+    private int topicCount;
+
+    @Param({"20"})
+    private int partitionCount;
+
+    Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+    RequestHeader header;
+
+    FetchRequest request;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.fetchData = new HashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+                    0, 0, 4096, Optional.empty());
+                fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+            }
+        }
+
+        this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+        this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());

Review comment:
       Changing our hashCode method massively improves the benchmark times so I think the current benchmark results aren't really representative.
   
   ```
   --- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
   +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
   @@ -46,10 +46,7 @@ public final class TopicPartition implements Serializable {
        public int hashCode() {
            if (hash != 0)
                return hash;
   -        final int prime = 31;
   -        int result = 1;
   -        result = prime * result + partition;
   -        result = prime * result + Objects.hashCode(topic);
   +        int result = Objects.hash(topic, partition);
            this.hash = result;
            return result;
        }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456225427



##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            // TODO is this valid for record instances?

Review comment:
       That would mean loading data from disk to compute equals and hashCode for FileRecords. That's pretty unusual for such methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456197601



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       This creates a copy of the underlying bytes, can we avoid it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455518434



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+    @Param({"1000"})
+    private int topicCount;
+
+    @Param({"20"})
+    private int partitionCount;
+
+    Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+    RequestHeader header;
+
+    FetchRequest request;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.fetchData = new HashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+                    0, 0, 4096, Optional.empty());
+                fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+            }
+        }
+
+        this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+        this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());

Review comment:
       Changing our hashCode method massively improves the benchmark times so I think the current benchmark results aren't really representative.
   
   ```
   --- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
   +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
   @@ -46,10 +46,7 @@ public final class TopicPartition implements Serializable {
        public int hashCode() {
            if (hash != 0)
                return hash;
   -        final int prime = 31;
   -        int result = 1;
   -        result = prime * result + partition;
   -        result = prime * result + Objects.hashCode(topic);
   +        int result = Objects.hash(topic, partition);
            this.hash = result;
            return result;
        }
   ```
   
   Edit: it looks like the main difference here is ordering by topic and then partition which seems to avoid the collisions for this reasonably pathological case. Maybe we can just change the test case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456092037



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+    /**
+     * Return the auto-generated `Message` instance if this request/response relies on one for
+     * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+     * classes, return `null`.
+     * @return
+     */
+    default ApiMessage data() {

Review comment:
       Perhaps instead we could add this to a mixin type. Then if we find cases where getting accessing to the `ApiMessage` generally would be useful, we could just use `instanceof` checks. These would ultimately go away after the conversions are finished.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-656988853


   I agree that it’d be great to have a benchmark on both  the request and response side.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456727715



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Looks like the expansion factor for ByteArrayOutputStream varies on the JDK version. In [JDK 8](https://github.com/openjdk/jdk/blob/jdk8-b120/jdk/src/share/classes/java/io/ByteArrayOutputStream.java#L105) and [11](https://github.com/openjdk/jdk/blob/jdk-11+28/src/java.base/share/classes/java/io/ByteArrayOutputStream.java) it's 2x, but in [JDK 14](https://github.com/openjdk/jdk/blob/jdk-14+36/src/java.base/share/classes/java/io/ByteArrayOutputStream.java#L95) it just grows the buffer to the minimum needed size.
   
   Our growth factor of 1.1 in ByteBufferOutputStream seems reasonable . Not to mention avoiding the final copy by using slice would be nice too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665938546






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455342535



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+    @Param({"1000"})
+    private int topicCount;
+
+    @Param({"20"})
+    private int partitionCount;
+
+    Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+    RequestHeader header;
+
+    FetchRequest request;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.fetchData = new HashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+                    0, 0, 4096, Optional.empty());
+                fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+            }
+        }
+
+        this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+        this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());

Review comment:
       Can you also try rerunning the benchmark with random topic names, e.g. `UUID.randomUUID().toString()` and compare it to the existing topic names? I think our hashCode implementation sucks and we are seeing a lot of collisions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455318129



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+    @Param({"1000"})
+    private int topicCount;
+
+    @Param({"20"})
+    private int partitionCount;
+
+    Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+    RequestHeader header;
+
+    FetchRequest request;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.fetchData = new HashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+                    0, 0, 4096, Optional.empty());
+                fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+            }
+        }
+
+        this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+        this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());

Review comment:
       Can measure both forConsumer and forReplica fetch requests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+       Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
       Let's open a jira for getting rid of the toPartitionDataMap if we don't address it in this PR. It's a pretty large part of the cost here and there are only a few places we would have to deal with it. I think we should fix it sooner rather than later too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-658918847


   Added some basic jmh benchmarks. Here are the preliminary results (run on my laptop, so take with a grain of salt). All these tests are using 1000 topics with 20 partitions each. For FetchResponse, I used static MemoryRecords rather than FileRecords to try and isolate the serialization time.
   
   On `trunk`:
   
   ```
   Benchmark                                        (partitionCount)  (topicCount)  Mode  Cnt        Score        Error  Units
   FetchRequestBenchmark.testConstructFetchRequest                20          1000  avgt   30         3.591 ±      0.046  ns/op
   FetchRequestBenchmark.testSerializeFetchRequest                20          1000  avgt   30  10049872.274 ± 440324.738  ns/op
   FetchResponseBenchmark.testConstructFetchResponse              20          1000  avgt   30         1.911 ±      0.018  ns/op
   FetchResponseBenchmark.testSerializeFetchResponse              20          1000  avgt   30  13693835.230 ± 150935.356  ns/op
   ```
   
   On this branch:
   
   ```
   Benchmark                                        (partitionCount)  (topicCount)  Mode  Cnt        Score        Error  Units
   FetchRequestBenchmark.testConstructFetchRequest                20          1000  avgt   30  4809813.661 ± 190773.702  ns/op
   FetchRequestBenchmark.testSerializeFetchRequest                20          1000  avgt   30  4646758.697 ± 551449.969  ns/op
   FetchResponseBenchmark.testConstructFetchResponse              20          1000  avgt   30  2507813.886 ±  17457.127  ns/op
   FetchResponseBenchmark.testSerializeFetchResponse              20          1000  avgt   30  7231935.691 ± 461221.717  ns/op
   ```
   
   As we expected quite a bit more time is spent during the construction of FetchRequest/FetchResponse due to conversion to existing data structures. We also see a reducing in serialization time since we no longer convert to `Struct` first.
   
   FetchRequest total construction+serialization time is about the same before and after the change, and FetchResponse total time is slightly less after the change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455518434



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+    @Param({"1000"})
+    private int topicCount;
+
+    @Param({"20"})
+    private int partitionCount;
+
+    Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+    RequestHeader header;
+
+    FetchRequest request;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.fetchData = new HashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+                    0, 0, 4096, Optional.empty());
+                fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+            }
+        }
+
+        this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+        this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());

Review comment:
       Changing our hashCode method massively improves our results so I think the current benchmark results aren't really representative.
   
   ```
   --- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
   +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
   @@ -46,10 +46,7 @@ public final class TopicPartition implements Serializable {
        public int hashCode() {
            if (hash != 0)
                return hash;
   -        final int prime = 31;
   -        int result = 1;
   -        result = prime * result + partition;
   -        result = prime * result + Objects.hashCode(topic);
   +        int result = Objects.hash(topic, partition);
            this.hash = result;
            return result;
        }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455361855



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+    @Param({"1000"})

Review comment:
       Let's add a smaller topic or partition count param benchmark. 20,000 partitions in a fetch request is larger than we would normally see :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453095731



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.fetchResponseData = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);

Review comment:
       Probably better to save for a follow-up, but potentially we can get rid of this conversion by using `FetchablePartitionResponse` directly in the broker. 

##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -49,41 +49,41 @@
   "fields": [
     { "name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
-    { "name": "MaxWaitMs", "type": "int32", "versions": "0+",
+    { "name": "MaxWaitTime", "type": "int32", "versions": "0+",

Review comment:
       Can we revert some of these renamings? We intentionally changed them in #8802. 

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -209,30 +210,21 @@
             FORGOTTEN_TOPIC_DATA_V7,
             RACK_ID);

Review comment:
       Similarly, we can get rid of all this.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.fetchResponseData = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
-                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.fetchResponseData = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        return fetchResponseData.toStruct(version);
     }
 
     @Override
     protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        RecordsWriter writer = new RecordsWriter(dest, sends::add);
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        fetchResponseData.size(cache, apiVersion);
+        fetchResponseData.write(writer, cache, apiVersion);
+        writer.flush();
+
+        // Compute the total size of all the Sends and write it out along with the header in the first Send
+        ResponseHeaderData responseHeaderData = responseHeader.data();
+
+        //Struct responseHeaderStruct = responseHeader.toStruct();

Review comment:
       nit: I guess we didn't need this?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -218,23 +220,19 @@
             SESSION_ID,
             new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V6)));
 
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
-            FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
-            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, FETCH_RESPONSE_V10,
-            FETCH_RESPONSE_V11};
-    }
-
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
-    private final int throttleTimeMs;
-    private final Errors error;
-    private final int sessionId;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> responseData;
+    private final FetchResponseData fetchResponseData;

Review comment:
       nit: in all of the other classes, we just use the name `data`. Can we do the same here?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -218,23 +220,19 @@
             SESSION_ID,
             new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V6)));

Review comment:
       We can get rid of all the stuff above too, right?

##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            // TODO is this valid for record instances?

Review comment:
       I don't think `FileRecords` and `MemoryRecords` instances can be compared directly, if that's what the question is about.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {

Review comment:
       Is it worth extending `ByteBufferAccessor` or not?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +368,164 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.fetchResponseData = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
-                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.fetchResponseData = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        return fetchResponseData.toStruct(version);
     }
 
     @Override
     protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        RecordsWriter writer = new RecordsWriter(dest, sends::add);

Review comment:
       Pretty nice if this is all the manual code we need. If we wanted to go a little further, we could push `toSend` into the generated class as well. That will be necessary if we ever want to get of the current `AbstractRequest` and `AbstractResponse` types and replace them with the generated data classes (which was always the plan). However, I think this could be left for follow-up work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456449035



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
         return metadata;
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+        FetchRequestData message = new FetchRequestData();
+        message.read(accessor, version);
+        return new FetchRequest(message, version);
+    }
+
+    @Override
+    public ByteBuffer serialize(RequestHeader header) {

Review comment:
       Indeed this is generic serialization code for the message classes. If we go with a mixin interface to indicate a class has been converted over to generated messages, we could also push this up to AbstractRequest. However, this might be better saved for a follow-on since we'll probably want to pick up additional changes from @ijuma's PR. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456730580



##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2397,6 +2460,9 @@ private String fieldDefault(FieldSpec field) {
                 headerGenerator.addImport(MessageGenerator.BYTES_CLASS);
                 return "Bytes.EMPTY";
             }
+        } else if (field.type().isRecords()) {
+            // TODO should we use some special EmptyRecords class instead?

Review comment:
       Ok, going to leave it as `null` since we need to deal with FileRecords and MemoryRecords in different cases.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456195934



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));

Review comment:
       This seems pretty inefficient (creating a lambda for each `byte` we write). Are we sure about it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456660379



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
         return metadata;
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+        FetchRequestData message = new FetchRequestData();
+        message.read(accessor, version);
+        return new FetchRequest(message, version);
+    }
+
+    @Override
+    public ByteBuffer serialize(RequestHeader header) {

Review comment:
       I'm ok saving this for #7409.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-662745982


   Thanks for this, @mumrah.  I took a look at the overall approach with the `RecordsWriter` and it looks reasonable.
   
   Do we need `RecordsReader`?  Seems like we could just add the `readRecords` method to `ByteBufferAccessor`.
   
   I do think `RecordsWriter` needs to be a separate class from `ByteBufferAccessor` -- it is doing something quite different, after all.  But I'm not sure that the generated code needs to know about `RecordsWriter`.  If we add a `writeRecords` method to `Writable` and a simple implementation to `ByteBufferAccessor`, we can avoid the downcast in the generated code.  That also suggests that maybe `RecordsWriter` could be in `org.apache.kafka.common.record`?  Maybe.
   
   It seems like `Writable` should have a `Writable#close` method, in case there's something the writable needs to do when we're done adding things.  Actually it should just extend AutoCloseable so that the compiler will complain if we don't close it appropriately.  Then that can be a no-op for `ByteBufferAccessor` but call flush when using `RecordsWriter`.
   
   Using `ByteBufferOutputStream` is wasteful when you have to do a lot of doublings.  When you do a doubling, you end up copying a lot of data that has already been flushed (and hence sent to the Sender).  You're making a new buffer to contain it, but why?  Nobody will ever read that part of the new buffer.  What the Sender will read is the part of the old (pre-doubling) buffer that contained that data.
   
   What you really want is to get rid of the ByteBufferOutputStream and just manage the buffer yourself here.  Then, when you need to enlarge, you can just copy the data that's live and not the old, already flushed data.
   
   The above could be done in a follow-on if you want.  I don't think it should block the merge


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453849419



##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            // TODO is this valid for record instances?

Review comment:
       No I don't think they are designed to be compared. My main question was whether we can compare the same type (MemoryRecords to MemoryRecords). I think it should work in the case of `Objects.equals` since it first checks if the instances are the same. I don't think we have any use cases where we have equivalent instances of records that are actual separate objects.
   
   I have a similar question about hashCode down below. Records doesn't implement either of these, but we have to include them for all fields in the generated message classes for completeness. I think it's probably fine. 
   
   @cmccabe, any insight here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r459499911



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteBufferOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+    private int mark;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+        this.mark = 0;
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        try {
+            output.writeByte(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeShort(short val) {
+        try {
+            output.writeShort(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeInt(int val) {
+        try {
+            output.writeInt(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeLong(long val) {
+        try {
+            output.writeLong(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        try {
+            ByteUtils.writeDouble(val, output);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        try {
+            output.write(arr);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        try {
+            ByteUtils.writeUnsignedVarint(i, output);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        try {
+            output.write(src.array(), src.position(), src.remaining());
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBuffer buf = byteArrayOutputStream.buffer();
+        int end = buf.position();
+        int len = end - mark;
+
+        if (len > 0) {
+            buf.position(mark);
+            ByteBuffer slice = buf.slice();
+            slice.limit(len);
+            ByteBufferSend send = new ByteBufferSend(dest, slice);
+            sendConsumer.accept(send);
+        }
+
+        buf.position(end);
+        mark = end;

Review comment:
       Ended up not using ByteBufferOutputStream




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456444794



##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -47,33 +47,35 @@
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",

Review comment:
       I was trying to keep the field names aligned with what was defined in the manual schemas. So, not sure if we should change any other field names in this PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456091241



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+    /**
+     * Return the auto-generated `Message` instance if this request/response relies on one for
+     * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+     * classes, return `null`.
+     * @return
+     */
+    default ApiMessage data() {

Review comment:
       I have a PR that does need. I really need to get that over the line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-656809268


   Thanks for the PR. Since this affects the fetch path, let's make sure we benchmark this. cc @lbradstreet 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456076927



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+    /**
+     * Return the auto-generated `Message` instance if this request/response relies on one for
+     * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+     * classes, return `null`.
+     * @return
+     */
+    default ApiMessage data() {

Review comment:
       Is there an advantage to pulling this up? Seems like we still need to update a bunch more classes. Until we have all the protocols converted, it might be safer to find another approach. 

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1249,26 +1249,26 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
                     }
                 }
 
-                if (partition.highWatermark >= 0) {
-                    log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark);
-                    subscriptions.updateHighWatermark(tp, partition.highWatermark);
+                if (partition.highWatermark() >= 0) {
+                    log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark());
+                    subscriptions.updateHighWatermark(tp, partition.highWatermark());
                 }
 
-                if (partition.logStartOffset >= 0) {
-                    log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset);
-                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset);
+                if (partition.logStartOffset() >= 0) {
+                    log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset());
+                    subscriptions.updateLogStartOffset(tp, partition.logStartOffset());
                 }
 
-                if (partition.lastStableOffset >= 0) {
-                    log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
-                    subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
+                if (partition.lastStableOffset() >= 0) {
+                    log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset());
+                    subscriptions.updateLastStableOffset(tp, partition.lastStableOffset());
                 }
 
-                if (partition.preferredReadReplica.isPresent()) {
-                    subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica.get(), () -> {
+                if (partition.preferredReadReplica().isPresent()) {

Review comment:
       nit: could probably change this to use `ifPresent`

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
         return metadata;
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+        FetchRequestData message = new FetchRequestData();
+        message.read(accessor, version);
+        return new FetchRequest(message, version);
+    }
+
+    @Override
+    public ByteBuffer serialize(RequestHeader header) {

Review comment:
       Are we overriding this so that we save the conversion to `Struct`? As far as I can tell, there's nothing specific to `FetchRequest` below. I wonder if we can move this implementation to `AbstractRequest.serialize` so that we save the conversion to Struct for all APIs that have been converted?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
         return metadata;
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);

Review comment:
       In the parsing logic, we still convert to struct first before calling `AbstractRequest.parseRequest`. I think we could bypass the `Struct` conversion by changing `AbstractRequest.parseRequest` to take the `ByteBuffer` instead of the `Struct`. 
   ```java
       public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, ByteBuffer buffer) {
   ```
   Then in the fetch case, we could just call this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457416269



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteBufferOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+    private int mark;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteBufferOutputStream(32);
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+        this.mark = 0;
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        try {
+            output.writeByte(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeShort(short val) {
+        try {
+            output.writeShort(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeInt(int val) {
+        try {
+            output.writeInt(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeLong(long val) {
+        try {
+            output.writeLong(val);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        try {
+            ByteUtils.writeDouble(val, output);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        try {
+            output.write(arr);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        try {
+            ByteUtils.writeUnsignedVarint(i, output);
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        try {
+            output.write(src.array(), src.position(), src.remaining());
+        } catch (IOException e) {
+            throw new RuntimeException("RecordsWriter encountered an IO error", e);
+        }
+    }
+
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBuffer buf = byteArrayOutputStream.buffer();
+        int end = buf.position();
+        int len = end - mark;
+
+        if (len > 0) {
+            buf.position(mark);
+            ByteBuffer slice = buf.slice();
+            slice.limit(len);
+            ByteBufferSend send = new ByteBufferSend(dest, slice);
+            sendConsumer.accept(send);
+        }
+
+        buf.position(end);
+        mark = end;

Review comment:
       I ended up having to keep a separate mark here since ByteBufferOutputStream doesn't keep the mark when it replaces the underlying buffer. I also didn't want to mess with that class in this PR since it had quite a lot of usages. We could look into fixing that as a follow-on




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665845747


   @ijuma you're right, i meant the consumer perf test. I updated my comment to clarify


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456447554



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+       Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
       Yeah, `Optional` support would be awesome. I was actually thinking how to do it. I may give it a shot during the weekend ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457496688



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       @mumrah Thanks for checking this. However, the behavior in JDK 14 has not changed in that way. Performance would be atrocious if it did:
   
   ```java
   private void ensureCapacity(int minCapacity) {
           // overflow-conscious code
           int oldCapacity = buf.length;
           int minGrowth = minCapacity - oldCapacity;
           if (minGrowth > 0) {
               buf = Arrays.copyOf(buf, ArraysSupport.newLength(oldCapacity,
                       minGrowth, oldCapacity /* preferred growth */));
           }
   ```
   
   The third parameter passed to `newLength` is the preferred growth, which is `oldCapacity`. That is, it doubles if it doesn't cause overflow. We should probably double for `ByteBufferOutputStream` too _if_ we have no estimate of the expected size. `1.1` makes sense if we do have an estimate (which is the case in current usage, I believe).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456450107



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -492,74 +327,51 @@ public int maxBytes() {
     }
 
     public boolean isFromFollower() {
-        return replicaId >= 0;
+        return replicaId() >= 0;
     }
 
     public IsolationLevel isolationLevel() {
-        return isolationLevel;
+        return IsolationLevel.forId(data.isolationLevel());
     }
 
     public FetchMetadata metadata() {
         return metadata;
     }
 
     public String rackId() {
-        return rackId;
+        return data.rackId();
     }
 
     public static FetchRequest parse(ByteBuffer buffer, short version) {
-        return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);

Review comment:
       I believe this is also addressed in @ijuma's PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-662805571


   Latest FetchResponse benchmark
   
   trunk
   ```
   Benchmark                                          (partitionCount)  (topicCount)  Mode  Cnt      Score      Error  Units
   FetchResponseBenchmark.testConstructFetchResponse                 3            10  avgt   15      2.126 ±    0.408  ns/op
   FetchResponseBenchmark.testSerializeFetchResponse                 3            10  avgt   15  19753.993 ± 3668.755  ns/op
   JMH benchmarks done
   ```
   
   this branch
   ```
   Benchmark                                          (partitionCount)  (topicCount)  Mode  Cnt     Score     Error  Units
   FetchResponseBenchmark.testConstructFetchResponse                 3            10  avgt   15  1165.485 ±  62.632  ns/op
   FetchResponseBenchmark.testSerializeFetchResponse                 3            10  avgt   15  6468.729 ± 230.405  ns/op
   JMH benchmarks done
   ```
   
   So a pretty good reduction, overall.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947


   I ran the consumer perf test (at @hachikuji's suggestion) and took a profile. Throughput was around 500MB/s on trunk and on this branch
   
   ![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png)
   
   Zoomed in a bit on the records part:
   
   ![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png)
   
   This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665828744


   What were the throughput numbers? I assume you meant the connsumer perf test, not console consumer.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-666374043


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457513713



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Looking a bit more, it seems like this will be mostly used by the data that precedes the actual records. Do we have a sense for what's the typical size for that? If we do, we can use that in the initial size and we can keep the `1.1` growth.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-658918847


   Added some basic jmh benchmarks. Here are the preliminary results (run on my laptop, so take with a grain of salt). All these tests are using 1000 topics with 20 partitions each. For FetchResponse, I used static MemoryRecords rather than FileRecords to try and better isolate the serialization time.
   
   On `trunk`:
   
   ```
   Benchmark                                        (partitionCount)  (topicCount)  Mode  Cnt        Score        Error  Units
   FetchRequestBenchmark.testConstructFetchRequest                20          1000  avgt   30         3.591 ±      0.046  ns/op
   FetchRequestBenchmark.testSerializeFetchRequest                20          1000  avgt   30  10049872.274 ± 440324.738  ns/op
   FetchResponseBenchmark.testConstructFetchResponse              20          1000  avgt   30         1.911 ±      0.018  ns/op
   FetchResponseBenchmark.testSerializeFetchResponse              20          1000  avgt   30  13693835.230 ± 150935.356  ns/op
   ```
   
   On this branch:
   
   ```
   Benchmark                                        (partitionCount)  (topicCount)  Mode  Cnt        Score        Error  Units
   FetchRequestBenchmark.testConstructFetchRequest                20          1000  avgt   30  4809813.661 ± 190773.702  ns/op
   FetchRequestBenchmark.testSerializeFetchRequest                20          1000  avgt   30  4646758.697 ± 551449.969  ns/op
   FetchResponseBenchmark.testConstructFetchResponse              20          1000  avgt   30  2507813.886 ±  17457.127  ns/op
   FetchResponseBenchmark.testSerializeFetchResponse              20          1000  avgt   30  7231935.691 ± 461221.717  ns/op
   ```
   
   As we expected quite a bit more time is spent during the construction of FetchRequest/FetchResponse due to conversion to existing data structures. We also see a reducing in serialization time since we no longer convert to `Struct` first.
   
   FetchRequest total construction+serialization time is about the same before and after the change, and FetchResponse total time is slightly less after the change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455094448



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+       Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
       Yes, I think it's a good idea. However, it would expand the scope of this change quite a bit. I'm working on some micro benchmarks now, and if we don't have any apparent regressions then I'll save this for a follow-on PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-660399783


   Recent test failures are due to removal of the static `parse` method on FetchRequest (it's only used via serialization in a test, so IntelliJ "usages" didn't catch it).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r453856418



##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -47,33 +47,35 @@
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",

Review comment:
       s/Responses/TopicResponses?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457562415



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Sounds good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162



##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -55,35 +55,35 @@
       "about": "The minimum bytes to accumulate in the response." },
     { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
       "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
-    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
+    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true,

Review comment:
       I guess the implicit expectation is that if the protocol does not support the `read_committed` isolation level, then it wouldn't have transactional data anyway, so reverting to `read_uncommitted` is safe. Can't find a fault with that.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str
             case PRODUCE:
                 return new ProduceRequest(struct, apiVersion);
             case FETCH:
-                return new FetchRequest(struct, apiVersion);
+                return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion);

Review comment:
       nit: any reason not to stick with the same constructor convention as the other requests?

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteBuffer buffer;
+    private int mark;
+
+    public RecordsWriter(String dest, int totalSize, Consumer<Send> sendConsumer) {

Review comment:
       Could we rename `totalSize` so that it is clear that it does not cover the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or something like that.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {

Review comment:
       nit: wonder if this should be `RecordsWritable` for consistency with `Writable`.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+    private final ByteBuffer buf;
+
+    public RecordsReader(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.get();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.getShort();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.getLong();
+    }
+
+    @Override
+    public double readDouble() {
+        return ByteUtils.readDouble(buf);
+    }
+
+    @Override
+    public void readArray(byte[] arr) {
+        buf.get(arr);
+    }
+
+    @Override
+    public int readUnsignedVarint() {
+        return ByteUtils.readUnsignedVarint(buf);
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer(int length) {

Review comment:
       More of a side question, but is this length guaranteed to be less than the buffer size? Wondering if it is worth adding range checking.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
-                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();

Review comment:
       nit: not a big deal, but I feel like calling `flush` should really be the responsibility of `write`.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
-                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();
+
+        // Compute the total size of all the Sends and write it out along with the header in the first Send
+        ResponseHeaderData responseHeaderData = responseHeader.data();
+
+        int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion());
+        int bodySize = (int) sends.stream().mapToLong(Send::size).sum();

Review comment:
       Instead of the cast, could we add a validation check?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+    /**
+     * Return the auto-generated `Message` instance if this request/response relies on one for
+     * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol
+     * classes, return `null`.
+     * @return
+     */
+    default ApiMessage data() {

Review comment:
       @mumrah Do we need this for this PR or can we leave this for #7409?

##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
+    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
+      { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
         "about": "The topic partitions.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partiiton index." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The error code, or 0 if there was no fetch error." },
-        { "name": "HighWatermark", "type": "int64", "versions": "0+",
-          "about": "The current high water mark." },
-        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
-          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
-          "about": "The current log start offset." },
-        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
-          "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
-            "about": "The producer id associated with the aborted transaction." },
-          { "name": "FirstOffset", "type": "int64", "versions": "4+",
-            "about": "The first offset in the aborted transaction." }
+        { "name":  "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
+          "fields":  [
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition index." },
+          { "name": "ErrorCode", "type": "int16", "versions": "0+",
+            "about": "The error code, or 0 if there was no fetch error." },
+          { "name": "HighWatermark", "type": "int64", "versions": "0+",
+            "about": "The current high water mark." },
+          { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
+            "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
+          { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
+            "about": "The current log start offset." },
+          { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
+            "about": "The aborted transactions.",  "fields": [
+            { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
+              "about": "The producer id associated with the aborted transaction." },
+            { "name": "FirstOffset", "type": "int64", "versions": "4+",
+              "about": "The first offset in the aborted transaction." }
+          ]},
+          { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true,

Review comment:
       I'm wondering if this should be ignorable. When this is set, the leader returns no data, so it relies crucially on the follower redirection.

##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
+    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
+      { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
         "about": "The topic partitions.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partiiton index." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The error code, or 0 if there was no fetch error." },
-        { "name": "HighWatermark", "type": "int64", "versions": "0+",
-          "about": "The current high water mark." },
-        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
-          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
-          "about": "The current log start offset." },
-        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
-          "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
-            "about": "The producer id associated with the aborted transaction." },
-          { "name": "FirstOffset", "type": "int64", "versions": "4+",
-            "about": "The first offset in the aborted transaction." }
+        { "name":  "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
+          "fields":  [
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition index." },
+          { "name": "ErrorCode", "type": "int16", "versions": "0+",
+            "about": "The error code, or 0 if there was no fetch error." },
+          { "name": "HighWatermark", "type": "int64", "versions": "0+",
+            "about": "The current high water mark." },
+          { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
+            "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
+          { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
+            "about": "The current log start offset." },
+          { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
+            "about": "The aborted transactions.",  "fields": [
+            { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
+              "about": "The producer id associated with the aborted transaction." },
+            { "name": "FirstOffset", "type": "int64", "versions": "4+",
+              "about": "The first offset in the aborted transaction." }
+          ]},
+          { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true,

Review comment:
       I'm wondering if this should be ignorable. When this is set, the leader returns no data, so it relies crucially on the follower redirecting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576


   Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included.
   
   On this branch: 
   ```
   Benchmark                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score    Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer                                                           3            10  avgt   15   2110.741 ± 27.935   ns/op
   FetchRequestBenchmark.testFetchRequestForReplica                                                            3            10  avgt   15   2021.114 ±  7.816   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer                                                  3            10  avgt   15   3452.799 ± 16.013   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica                                                   3            10  avgt   15   3691.157 ± 60.260   ns/op
   
   GC Profile                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score    Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate                                            3            10  avgt   15   4295.532 ± 56.061  MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm                                       3            10  avgt   15   9984.000 ±  0.001    B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space                                   3            10  avgt   15   4292.525 ± 56.341  MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                              3            10  avgt   15   9977.037 ± 28.311    B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                               3            10  avgt   15      0.187 ±  0.027  MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                          3            10  avgt   15      0.435 ±  0.060    B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count                                                 3            10  avgt   15   2335.000           counts
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time                                                  3            10  avgt   15   1375.000               ms
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate                                             3            10  avgt   15   4416.855 ± 16.429  MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm                                        3            10  avgt   15   9832.000 ±  0.001    B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space                                    3            10  avgt   15   4417.032 ± 24.858  MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                               3            10  avgt   15   9832.358 ± 28.932    B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space                                3            10  avgt   15      0.186 ±  0.015  MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                           3            10  avgt   15      0.415 ±  0.033    B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.count                                                  3            10  avgt   15   2280.000           counts
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.time                                                   3            10  avgt   15   1376.000               ms
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate                                   3            10  avgt   15   3256.172 ± 15.524  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm                              3            10  avgt   15  12384.000 ±  0.001    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space                          3            10  avgt   15   3255.019 ± 21.484  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                     3            10  avgt   15  12379.587 ± 49.161    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                      3            10  avgt   15      0.122 ±  0.022  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                 3            10  avgt   15      0.462 ±  0.084    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count                                        3            10  avgt   15   2054.000           counts
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time                                         3            10  avgt   15   1389.000               ms
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate                                    3            10  avgt   15   3319.965 ± 53.427  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm                               3            10  avgt   15  13496.000 ±  0.001    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space                           3            10  avgt   15   3320.125 ± 52.812  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                      3            10  avgt   15  13496.813 ± 64.774    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space                       3            10  avgt   15      0.126 ±  0.021  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                  3            10  avgt   15      0.512 ±  0.085    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count                                         3            10  avgt   15   2122.000           counts
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time                                          3            10  avgt   15   1395.000               ms
   ```
   
   On trunk:
   ```
   Benchmark                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score     Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer                                                           3            10  avgt   15      3.457 ±   0.016   ns/op
   FetchRequestBenchmark.testFetchRequestForReplica                                                            3            10  avgt   15      3.453 ±   0.035   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer                                                  3            10  avgt   15  13214.306 ±  61.158   ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica                                                   3            10  avgt   15  13147.870 ±  52.318   ns/op
   
   GC Profile                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score     Error   Units
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate                                            3            10  avgt   15     ≈ 10⁻⁴            MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm                                       3            10  avgt   15     ≈ 10⁻⁶              B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count                                                 3            10  avgt   15        ≈ 0            counts
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate                                             3            10  avgt   15     ≈ 10⁻⁴            MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm                                        3            10  avgt   15     ≈ 10⁻⁶              B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.count                                                  3            10  avgt   15        ≈ 0            counts
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate                                   3            10  avgt   15   1795.576 ±   8.351  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm                              3            10  avgt   15  26136.002 ±   0.005    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space                          3            10  avgt   15   1796.108 ±  11.527  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                     3            10  avgt   15  26143.702 ± 100.832    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                      3            10  avgt   15      0.163 ±   0.019  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                 3            10  avgt   15      2.366 ±   0.270    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count                                        3            10  avgt   15   2134.000            counts
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time                                         3            10  avgt   15   1412.000                ms
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate                                    3            10  avgt   15   1804.695 ±   7.193  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm                               3            10  avgt   15  26136.002 ±   0.005    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space                           3            10  avgt   15   1805.666 ±   7.990  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                      3            10  avgt   15  26150.127 ±  86.455    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space                       3            10  avgt   15      0.166 ±   0.016  MB/sec
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                  3            10  avgt   15      2.406 ±   0.238    B/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count                                         3            10  avgt   15   2097.000            counts
   FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time                                          3            10  avgt   15   1395.000                ms
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947


   I ran a console consumer perf test (at @hachikuji's suggestion) and took a profile. 
   
   ![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png)
   
   Zoomed in a bit on the records part:
   
   ![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png)
   
   This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #9008:
URL: https://github.com/apache/kafka/pull/9008


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-661049255


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-656906624


   I agree that some benchmarks would be useful. One of the key differences is how the `MultiRecordsSend` gets constructed, so that is probably one thing. Potentially we are faster here because we do not have the conversion to `Struct`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457546281



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Thanks for the explanation, @ijuma. I missed the semantics of `newLength`
   
   After the initial few top-level fields, each partition will have something like 38 bytes preceding its records. Maybe we could increase initial capacity to 64 bytes? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456196331



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
##########
@@ -35,6 +36,10 @@
     int readUnsignedVarint();
     ByteBuffer readByteBuffer(int length);
 
+    default BaseRecords readRecords(int length) {
+        throw new UnsupportedOperationException("Not implemented");
+    }

Review comment:
       Is this and the respective `writeRecords` in the base interface needed? It seems like they're only implemented in two specific cases. Could we not downcast for those cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659673793


   > Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included.
   > 
   > On this branch:
   > 
   > ```
   > Benchmark                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score    Error   Units
   > FetchRequestBenchmark.testFetchRequestForConsumer                                                           3            10  avgt   15   2110.741 ± 27.935   ns/op
   > FetchRequestBenchmark.testFetchRequestForReplica                                                            3            10  avgt   15   2021.114 ±  7.816   ns/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer                                                  3            10  avgt   15   3452.799 ± 16.013   ns/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica                                                   3            10  avgt   15   3691.157 ± 60.260   ns/op
   > 
   > GC Profile                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score    Error   Units
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate                                            3            10  avgt   15   4295.532 ± 56.061  MB/sec
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm                                       3            10  avgt   15   9984.000 ±  0.001    B/op
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space                                   3            10  avgt   15   4292.525 ± 56.341  MB/sec
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                              3            10  avgt   15   9977.037 ± 28.311    B/op
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                               3            10  avgt   15      0.187 ±  0.027  MB/sec
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                          3            10  avgt   15      0.435 ±  0.060    B/op
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count                                                 3            10  avgt   15   2335.000           counts
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time                                                  3            10  avgt   15   1375.000               ms
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate                                             3            10  avgt   15   4416.855 ± 16.429  MB/sec
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm                                        3            10  avgt   15   9832.000 ±  0.001    B/op
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space                                    3            10  avgt   15   4417.032 ± 24.858  MB/sec
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                               3            10  avgt   15   9832.358 ± 28.932    B/op
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space                                3            10  avgt   15      0.186 ±  0.015  MB/sec
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                           3            10  avgt   15      0.415 ±  0.033    B/op
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.count                                                  3            10  avgt   15   2280.000           counts
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.time                                                   3            10  avgt   15   1376.000               ms
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate                                   3            10  avgt   15   3256.172 ± 15.524  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm                              3            10  avgt   15  12384.000 ±  0.001    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space                          3            10  avgt   15   3255.019 ± 21.484  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                     3            10  avgt   15  12379.587 ± 49.161    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                      3            10  avgt   15      0.122 ±  0.022  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                 3            10  avgt   15      0.462 ±  0.084    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count                                        3            10  avgt   15   2054.000           counts
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time                                         3            10  avgt   15   1389.000               ms
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate                                    3            10  avgt   15   3319.965 ± 53.427  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm                               3            10  avgt   15  13496.000 ±  0.001    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space                           3            10  avgt   15   3320.125 ± 52.812  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                      3            10  avgt   15  13496.813 ± 64.774    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space                       3            10  avgt   15      0.126 ±  0.021  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                  3            10  avgt   15      0.512 ±  0.085    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count                                         3            10  avgt   15   2122.000           counts
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time                                          3            10  avgt   15   1395.000               ms
   > ```
   > 
   > On trunk:
   > 
   > ```
   > Benchmark                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score     Error   Units
   > FetchRequestBenchmark.testFetchRequestForConsumer                                                           3            10  avgt   15      3.457 ±   0.016   ns/op
   > FetchRequestBenchmark.testFetchRequestForReplica                                                            3            10  avgt   15      3.453 ±   0.035   ns/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer                                                  3            10  avgt   15  13214.306 ±  61.158   ns/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica                                                   3            10  avgt   15  13147.870 ±  52.318   ns/op
   > 
   > GC Profile                                                                                    (partitionCount)  (topicCount)  Mode  Cnt      Score     Error   Units
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate                                            3            10  avgt   15     ≈ 10⁻⁴            MB/sec
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm                                       3            10  avgt   15     ≈ 10⁻⁶              B/op
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count                                                 3            10  avgt   15        ≈ 0            counts
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate                                             3            10  avgt   15     ≈ 10⁻⁴            MB/sec
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm                                        3            10  avgt   15     ≈ 10⁻⁶              B/op
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.count                                                  3            10  avgt   15        ≈ 0            counts
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate                                   3            10  avgt   15   1795.576 ±   8.351  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm                              3            10  avgt   15  26136.002 ±   0.005    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space                          3            10  avgt   15   1796.108 ±  11.527  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm                     3            10  avgt   15  26143.702 ± 100.832    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space                      3            10  avgt   15      0.163 ±   0.019  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm                 3            10  avgt   15      2.366 ±   0.270    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.count                                        3            10  avgt   15   2134.000            counts
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.time                                         3            10  avgt   15   1412.000                ms
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate                                    3            10  avgt   15   1804.695 ±   7.193  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.alloc.rate.norm                               3            10  avgt   15  26136.002 ±   0.005    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space                           3            10  avgt   15   1805.666 ±   7.990  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm                      3            10  avgt   15  26150.127 ±  86.455    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space                       3            10  avgt   15      0.166 ±   0.016  MB/sec
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm                  3            10  avgt   15      2.406 ±   0.238    B/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.count                                         3            10  avgt   15   2097.000            counts
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica:·gc.time                                          3            10  avgt   15   1395.000                ms
   > ```
   
   Nice, so roughly for the replica fetch:
   ```
   2021.114 + 3691.157 = 5712.271 ns
   vs
   0.035  + 13147.870 = 13147.905 ns
   ```
   57% reduction in CPU time.
   
   Alloc rate normalized comparison:
   ```
   9984.000 + 9832.000 = 19816 B/op
   vs
   26136.002 B/op
   ```
   24.18% reduction in garbage generation.
   
   I think the garbage generation will massively improve once we can get rid of toPartitionDataMap later.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456518529



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Yea, it's possible, but rather complicated I think. We would need to manage our own byte array and grow it on-demand (like what happens in ByteArrayOutputStream). Then we could use `ByteBuffer#slice` to pass views of this array to the ByteBufferSend objects. I don't think this current approach is any worse than before in terms of array allocations, so maybe we could save this for a future optimization?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455828890



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+        Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));
+        }));
+        return Collections.unmodifiableMap(result);
+    }
+
+    private List<TopicPartition> toForgottonTopicList(List<FetchRequestData.ForgottenTopic> forgottenTopics) {

Review comment:
       Typo "Forgotton"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455407043



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##########
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
         }
     }
 
+    private Map<TopicPartition, PartitionData> toPartitionDataMap(List<FetchRequestData.FetchTopic> fetchableTopics) {
+       Map<TopicPartition, PartitionData> result = new LinkedHashMap<>();
+        fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> {
+            Optional<Integer> leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch())
+                .filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH);
+            result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()),
+                new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(),
+                    fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
       As an aside, it would be awesome to add `Optional` support to the generated classes. We have had so many bugs which were caused by sentinel values sneaking into unexpected places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456197321



##########
File path: generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##########
@@ -2078,6 +2103,11 @@ private void generateFieldEquals(FieldSpec field) {
                 buffer.printf("if (!Arrays.equals(this.%s, other.%s)) return false;%n",
                     field.camelCaseName(), field.camelCaseName());
             }
+        } else if (field.type().isRecords()) {
+            // TODO is this valid for record instances?

Review comment:
       The hashCode of `MemoryRecords` takes into account the buffer position, so it's kind of useless. `FileRecords` doesn't even define it. We should consider defining the hashCode and equals of `Records` to be identity based.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457546281



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       Thanks for the explanation, @ijuma. I missed the semantics of `newLength`
   
   After the initial few top-level fields, each partition will have something like 38 bytes preceding its records (at a minimum, aborted transactions could increase that). Maybe we could increase initial capacity to 64 bytes? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455318129



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/FetchRequestBenchmark.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.jmh.common;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ByteBufferChannel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 15)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class FetchRequestBenchmark {
+    @Param({"1000"})
+    private int topicCount;
+
+    @Param({"20"})
+    private int partitionCount;
+
+    Map<TopicPartition, FetchRequest.PartitionData> fetchData;
+
+    RequestHeader header;
+
+    FetchRequest request;
+
+
+    @Setup(Level.Trial)
+    public void setup() {
+        this.fetchData = new HashMap<>();
+        for (int topicIdx = 0; topicIdx < topicCount; topicIdx++) {
+            for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
+                FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(
+                    0, 0, 4096, Optional.empty());
+                fetchData.put(new TopicPartition(String.format("topic-%04d", topicIdx), partitionId), partitionData);
+            }
+        }
+
+        this.header = new RequestHeader(ApiKeys.FETCH, ApiKeys.FETCH.latestVersion(), "jmh-benchmark", 100);
+        this.request = FetchRequest.Builder.forConsumer(0, 0, fetchData).build(ApiKeys.FETCH.latestVersion());

Review comment:
       Can we please have benchmarks for both forConsumer and forReplica fetch requests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-666531948


   These test failures are known flaky tests which already have jira tickets


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-662794475


   Thanks @cmccabe, great feedback. I've updated RecordsWriter to allocate a single ByteBuffer based on a pre-calculated length (total message size - all records size). This avoids the buffer resizing altogether.
   
   I like your suggestions for Writable#close and moving readRecords into ByteBufferAccessor. I'll save these for a follow-on


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
-                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();

Review comment:
       Yea, I agree. @cmccabe had a suggestion about adding `Writable#close` which would achieve the same goal. I think this would be nice and clean things up a bit. I'll open a follow up PR for this

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID)
-                ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) abortedTransactionObj;
-                            long producerId = abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();
+
+        // Compute the total size of all the Sends and write it out along with the header in the first Send
+        ResponseHeaderData responseHeaderData = responseHeader.data();
+
+        int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion());
+        int bodySize = (int) sends.stream().mapToLong(Send::size).sum();

Review comment:
       Do you mean something like `Math.toIntExact`?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str
             case PRODUCE:
                 return new ProduceRequest(struct, apiVersion);
             case FETCH:
-                return new FetchRequest(struct, apiVersion);
+                return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion);

Review comment:
       I just wanted to remove the Struct constructor of FetchRequest completely. Eventually, `RequestContext#parseRequest(ByteBuffer)` will stop using Structs and pass the message data classes to `AbstractRequest#parseRequest` (or similar).

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+    private final ByteBuffer buf;
+
+    public RecordsReader(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.get();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.getShort();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.getLong();
+    }
+
+    @Override
+    public double readDouble() {
+        return ByteUtils.readDouble(buf);
+    }
+
+    @Override
+    public void readArray(byte[] arr) {
+        buf.get(arr);
+    }
+
+    @Override
+    public int readUnsignedVarint() {
+        return ByteUtils.readUnsignedVarint(buf);
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer(int length) {

Review comment:
       This is copied straight from ByteBufferAccessor and will probably go away in a follow-on PR. But either way, looking at it it seems it should always be in range since this is used by zero-copy byte fields in the message classes, e.g.
   
   ```
   int len = _reader.readInt();
   if (len > 0) {
     this.someZeroCopyField = _reader.readByteBuffer(len);
   }
   ```
   
   So generally it's probably safe. In the case of a corrupt message where the length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It probably would be nice to put a range check in ByteBufferAccessor so we can throw a more useful error.

##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+    private final ByteBuffer buf;
+
+    public RecordsReader(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.get();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.getShort();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.getLong();
+    }
+
+    @Override
+    public double readDouble() {
+        return ByteUtils.readDouble(buf);
+    }
+
+    @Override
+    public void readArray(byte[] arr) {
+        buf.get(arr);
+    }
+
+    @Override
+    public int readUnsignedVarint() {
+        return ByteUtils.readUnsignedVarint(buf);
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer(int length) {

Review comment:
       This is copied straight from ByteBufferAccessor and will probably go away in a follow-on PR. But either way, looking at it it seems it should always be in range since this is used by zero-copy byte fields in the message classes, e.g.
   
   ```java
   int len = _reader.readInt();
   if (len > 0) {
     this.someZeroCopyField = _reader.readByteBuffer(len);
   }
   ```
   
   So generally it's probably safe. In the case of a corrupt message where the length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It probably would be nice to put a range check in ByteBufferAccessor so we can throw a more useful error.

##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false,
+    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true,
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+",
+      { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+",
         "about": "The topic partitions.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partiiton index." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The error code, or 0 if there was no fetch error." },
-        { "name": "HighWatermark", "type": "int64", "versions": "0+",
-          "about": "The current high water mark." },
-        { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
-          "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
-          "about": "The current log start offset." },
-        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false,
-          "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
-            "about": "The producer id associated with the aborted transaction." },
-          { "name": "FirstOffset", "type": "int64", "versions": "4+",
-            "about": "The first offset in the aborted transaction." }
+        { "name":  "PartitionHeader", "type": "PartitionHeader", "versions": "0+",
+          "fields":  [
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition index." },
+          { "name": "ErrorCode", "type": "int16", "versions": "0+",
+            "about": "The error code, or 0 if there was no fetch error." },
+          { "name": "HighWatermark", "type": "int64", "versions": "0+",
+            "about": "The current high water mark." },
+          { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true,
+            "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" },
+          { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true,
+            "about": "The current log start offset." },
+          { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true,
+            "about": "The aborted transactions.",  "fields": [
+            { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId",
+              "about": "The producer id associated with the aborted transaction." },
+            { "name": "FirstOffset", "type": "int64", "versions": "4+",
+              "about": "The first offset in the aborted transaction." }
+          ]},
+          { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true,

Review comment:
       I see what you mean. If we have a bug that causes us to hit the preferred replica code for an older api version, we should fail to serialize the message rather than sending it to a client that doesn't understand follower redirection.
   
   Good catch.

##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -55,35 +55,35 @@
       "about": "The minimum bytes to accumulate in the response." },
     { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true,
       "about": "The maximum bytes to fetch.  See KIP-74 for cases where this limit may not be honored." },
-    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false,
+    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true,

Review comment:
       I changed this to make the JSON schema match what was previously in FetchRequest.java. During serialization, we would simply stick the isolation level in the Struct regardless of the api version:
   
   ```java
   struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id());
   ```
   
   So even if we were writing out a v3 FetchRequest, whatever value we put here would be ignored and not sent out. There were also some unit tests that utilized this behavior.
   
   Your assessment sounds correct though, so it probably doesn't matter whether it's ignorable or not. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r457496688



##########
File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is
+ * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteArrayOutputStream byteArrayOutputStream;
+    private final DataOutput output;
+
+    public RecordsWriter(String dest, Consumer<Send> sendConsumer) {
+        this.dest = dest;
+        this.sendConsumer = sendConsumer;
+        this.byteArrayOutputStream = new ByteArrayOutputStream();
+        this.output = new DataOutputStream(this.byteArrayOutputStream);
+    }
+
+    @Override
+    public void writeByte(byte val) {
+        writeQuietly(() -> output.writeByte(val));
+    }
+
+    @Override
+    public void writeShort(short val) {
+        writeQuietly(() -> output.writeShort(val));
+    }
+
+    @Override
+    public void writeInt(int val) {
+        writeQuietly(() -> output.writeInt(val));
+    }
+
+    @Override
+    public void writeLong(long val) {
+        writeQuietly(() -> output.writeLong(val));
+
+    }
+
+    @Override
+    public void writeDouble(double val) {
+        writeQuietly(() -> ByteUtils.writeDouble(val, output));
+
+    }
+
+    @Override
+    public void writeByteArray(byte[] arr) {
+        writeQuietly(() -> output.write(arr));
+    }
+
+    @Override
+    public void writeUnsignedVarint(int i) {
+        writeQuietly(() -> ByteUtils.writeUnsignedVarint(i, output));
+    }
+
+    @Override
+    public void writeByteBuffer(ByteBuffer src) {
+        writeQuietly(() -> output.write(src.array(), src.position(), src.remaining()));
+    }
+
+    @FunctionalInterface
+    private interface IOExceptionThrowingRunnable {
+        void run() throws IOException;
+    }
+
+    private void writeQuietly(IOExceptionThrowingRunnable runnable) {
+        try {
+            runnable.run();
+        } catch (IOException e) {
+            throw new RuntimeException("Writable encountered an IO error", e);
+        }
+    }
+
+    @Override
+    public void writeRecords(BaseRecords records) {
+        flush();
+        sendConsumer.accept(records.toSend(dest));
+    }
+
+    /**
+     * Flush any pending bytes as a ByteBufferSend and reset the buffer
+     */
+    public void flush() {
+        ByteBufferSend send = new ByteBufferSend(dest,
+                ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));

Review comment:
       @mumrah Thanks for checking this. However, the behavior in JDK 14 has not changed in that way. Performance would be atrocious if it did:
   
   ```java
   private void ensureCapacity(int minCapacity) {
           // overflow-conscious code
           int oldCapacity = buf.length;
           int minGrowth = minCapacity - oldCapacity;
           if (minGrowth > 0) {
               buf = Arrays.copyOf(buf, ArraysSupport.newLength(oldCapacity,
                       minGrowth, oldCapacity /* preferred growth */));
           }
   ```
   
   The third parameter passed to `newLength` is the preferred growth, which is `oldCapacity`. That is, it doubles if it doesn't cause overflow. We should probably double for `ByteBufferOutputStream` too _if_ we have no estimate of the expected size. `1.1` growth makes sense if we do have a reasonable estimate (which is the case in current usage, I believe, but perhaps not in this case).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org