You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ma...@apache.org on 2022/09/02 08:52:29 UTC

[kafka] branch 2.8 updated (cd032154a7 -> 00acd559a8)

This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a change to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from cd032154a7 KAFKA-14107: Upgrade Jetty version for CVE fixes (#12440)
     new 14951a83e3 MINOR: Add more validation during KRPC deserialization
     new b7dd40ff2b MINOR: Add configurable max receive size for SASL authentication requests
     new 301a0d3f64 MINOR: Disable kraft system tests in 2.8 branch
     new 7ef72abb4d  MINOR: Update LICENSE for 2.8.2
     new f6583c9d63 MINOR: Bump version in upgrade guide to 2.8.2
     new 00acd559a8 MINOR: Update version to 2.8.2

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 LICENSE-binary                                     | 37 ++++-----
 checkstyle/suppressions.xml                        |  6 ++
 .../config/internals/BrokerSecurityConfigs.java    |  6 ++
 .../kafka/common/protocol/ByteBufferAccessor.java  | 14 +++-
 .../common/protocol/DataInputStreamReadable.java   | 18 ++++-
 .../org/apache/kafka/common/protocol/Readable.java |  9 +--
 .../apache/kafka/common/record/DefaultRecord.java  |  2 +
 .../authenticator/SaslServerAuthenticator.java     | 16 +++-
 .../common/message/SimpleArraysMessageTest.java    | 54 +++++++++++++
 .../common/protocol/ByteBufferAccessorTest.java    | 58 ++++++++++++++
 .../kafka/common/record/DefaultRecordTest.java     | 14 ++++
 .../kafka/common/requests/RequestContextTest.java  | 83 +++++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java | 93 ++++++++++++++++++++++
 .../kafka/common/security/TestSecurityConfig.java  |  2 +
 .../authenticator/SaslAuthenticatorTest.java       | 46 +++++++++++
 .../authenticator/SaslServerAuthenticatorTest.java |  6 +-
 ...ecordsMessage.json => SimpleArraysMessage.json} | 15 ++--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 +
 .../main/scala/kafka/tools/TestRaftServer.scala    |  6 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |  6 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  2 +
 docs/js/templateData.js                            |  2 +-
 docs/upgrade.html                                  |  4 +-
 .../apache/kafka/message/MessageDataGenerator.java |  9 ++-
 gradle.properties                                  |  2 +-
 .../apache/kafka/raft/internals/StringSerde.java   |  3 +-
 streams/quickstart/java/pom.xml                    |  2 +-
 .../src/main/resources/archetype-resources/pom.xml |  2 +-
 streams/quickstart/pom.xml                         |  2 +-
 tests/kafkatest/__init__.py                        |  2 +-
 tests/kafkatest/sanity_checks/test_bounce.py       |  4 -
 .../sanity_checks/test_console_consumer.py         |  5 +-
 .../sanity_checks/test_verifiable_producer.py      | 74 -----------------
 tests/kafkatest/services/kafka/quorum.py           |  6 +-
 tests/kafkatest/tests/core/security_test.py        |  2 +-
 35 files changed, 468 insertions(+), 148 deletions(-)
 create mode 100644 clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
 create mode 100644 clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
 copy clients/src/test/resources/common/message/{SimpleRecordsMessage.json => SimpleArraysMessage.json} (71%)


[kafka] 05/06: MINOR: Bump version in upgrade guide to 2.8.2

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f6583c9d63379e18702bab29bff622af30d80b57
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Sep 2 14:08:55 2022 +0530

    MINOR: Bump version in upgrade guide to 2.8.2
---
 docs/upgrade.html | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/docs/upgrade.html b/docs/upgrade.html
index ecebba9ec2..0d5094657d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,7 +19,7 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
-<h4><a id="upgrade_2_8_1" href="#upgrade_2_8_1">Upgrading to 2.8.1 from any version 0.8.x through 2.7.x</a></h4>
+<h4><a id="upgrade_2_8_2" href="#upgrade_2_8_2">Upgrading to 2.8.2 from any version 0.8.x through 2.7.x</a></h4>
 
 <p><b>If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
     Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
@@ -79,6 +79,8 @@
     <li>Kafka Streams introduce a type-safe <code>split()</code> operator as a substitution for deprecated <code>KStream#branch()</code> method
         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream">KIP-418</a>).
     </li>
+    <li>KRaft mode is available as Early Access feature in 2.8.0 release. This is discontinued in 2.8.2 release. Please use 3.0 relase and above for KRaft mode
+    </li>
 </ul>
 
 <h4><a id="upgrade_2_7_1" href="#upgrade_2_7_1">Upgrading to 2.7.1 from any version 0.8.x through 2.6.x</a></h4>


[kafka] 03/06: MINOR: Disable kraft system tests in 2.8 branch

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 301a0d3f6434e7f9ec0a20ad0669eea01f936e40
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Thu Sep 1 23:26:03 2022 +0530

    MINOR: Disable kraft system tests in 2.8 branch
---
 tests/kafkatest/sanity_checks/test_bounce.py       |  4 --
 .../sanity_checks/test_console_consumer.py         |  5 +-
 .../sanity_checks/test_verifiable_producer.py      | 74 ----------------------
 tests/kafkatest/services/kafka/quorum.py           |  6 +-
 tests/kafkatest/tests/core/security_test.py        |  2 +-
 5 files changed, 5 insertions(+), 86 deletions(-)

diff --git a/tests/kafkatest/sanity_checks/test_bounce.py b/tests/kafkatest/sanity_checks/test_bounce.py
index c01f23b0cb..c954fba17c 100644
--- a/tests/kafkatest/sanity_checks/test_bounce.py
+++ b/tests/kafkatest/sanity_checks/test_bounce.py
@@ -44,10 +44,6 @@ class TestBounce(Test):
         if self.zk:
             self.zk.start()
 
-    @cluster(num_nodes=6)
-    @parametrize(metadata_quorum=quorum.remote_raft)
-    @cluster(num_nodes=4)
-    @parametrize(metadata_quorum=quorum.colocated_raft)
     @cluster(num_nodes=4)
     @parametrize(metadata_quorum=quorum.zk)
     def test_simple_run(self, metadata_quorum):
diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py
index 0847ce0cb4..2448b75e03 100644
--- a/tests/kafkatest/sanity_checks/test_console_consumer.py
+++ b/tests/kafkatest/sanity_checks/test_console_consumer.py
@@ -43,12 +43,9 @@ class ConsoleConsumerTest(Test):
         if self.zk:
             self.zk.start()
 
-    @cluster(num_nodes=3)
-    @matrix(security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=quorum.all_raft)
+
     @cluster(num_nodes=4)
-    @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['PLAIN'], metadata_quorum=quorum.all_raft)
     @matrix(security_protocol=['SASL_SSL'], sasl_mechanism=['SCRAM-SHA-256', 'SCRAM-SHA-512']) # SCRAM not yet supported with Raft
-    @matrix(security_protocol=['SASL_PLAINTEXT', 'SASL_SSL'], metadata_quorum=quorum.all_raft)
     def test_lifecycle(self, security_protocol, sasl_mechanism='GSSAPI', metadata_quorum=quorum.zk):
         """Check that console consumer starts/stops properly, and that we are capturing log output."""
 
diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
index 7fcb603d59..fb21c08da5 100644
--- a/tests/kafkatest/sanity_checks/test_verifiable_producer.py
+++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py
@@ -96,77 +96,3 @@ class TestVerifiableProducer(Test):
         num_produced = self.producer.num_acked
         assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
 
-    @cluster(num_nodes=4)
-    @matrix(inter_broker_security_protocol=['PLAINTEXT', 'SSL'], metadata_quorum=[quorum.remote_raft])
-    @matrix(inter_broker_security_protocol=['SASL_SSL'], inter_broker_sasl_mechanism=['PLAIN', 'GSSAPI'],
-            metadata_quorum=[quorum.remote_raft])
-    def test_multiple_raft_security_protocols(
-            self, inter_broker_security_protocol, inter_broker_sasl_mechanism='GSSAPI', metadata_quorum=quorum.remote_raft):
-        """
-        Test for remote Raft cases that we can start VerifiableProducer on the current branch snapshot version, and
-        verify that we can produce a small number of messages.  The inter-controller and broker-to-controller
-        security protocols are defined to be different (which differs from the above test, where they were the same).
-        """
-        self.kafka.security_protocol = self.kafka.interbroker_security_protocol = inter_broker_security_protocol
-        self.kafka.client_sasl_mechanism = self.kafka.interbroker_sasl_mechanism = inter_broker_sasl_mechanism
-        controller_quorum = self.kafka.controller_quorum
-        sasl_mechanism = 'PLAIN' if inter_broker_sasl_mechanism == 'GSSAPI' else 'GSSAPI'
-        if inter_broker_security_protocol == 'PLAINTEXT':
-            controller_security_protocol = 'SSL'
-            intercontroller_security_protocol = 'SASL_SSL'
-        elif inter_broker_security_protocol == 'SSL':
-            controller_security_protocol = 'SASL_SSL'
-            intercontroller_security_protocol = 'PLAINTEXT'
-        else: # inter_broker_security_protocol == 'SASL_SSL'
-            controller_security_protocol = 'PLAINTEXT'
-            intercontroller_security_protocol = 'SSL'
-        controller_quorum.controller_security_protocol = controller_security_protocol
-        controller_quorum.controller_sasl_mechanism = sasl_mechanism
-        controller_quorum.intercontroller_security_protocol = intercontroller_security_protocol
-        controller_quorum.intercontroller_sasl_mechanism = sasl_mechanism
-        self.kafka.start()
-
-        node = self.producer.nodes[0]
-        node.version = KafkaVersion(str(DEV_BRANCH))
-        self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
-             err_msg="Producer failed to start in a reasonable amount of time.")
-
-        # See above comment above regarding use of version.vstring (distutils.version.LooseVersion)
-        assert is_version(node, [node.version.vstring], logger=self.logger)
-
-        self.producer.wait()
-        num_produced = self.producer.num_acked
-        assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
-
-    @cluster(num_nodes=4)
-    @parametrize(metadata_quorum=quorum.remote_raft)
-    def test_multiple_raft_sasl_mechanisms(self, metadata_quorum):
-        """
-        Test for remote Raft cases that we can start VerifiableProducer on the current branch snapshot version, and
-        verify that we can produce a small number of messages.  The inter-controller and broker-to-controller
-        security protocols are both SASL_PLAINTEXT but the SASL mechanisms are different (we set
-        GSSAPI for the inter-controller mechanism and PLAIN for the broker-to-controller mechanism).
-        This test differs from the above tests -- he ones above used the same SASL mechanism for both paths.
-        """
-        self.kafka.security_protocol = self.kafka.interbroker_security_protocol = 'PLAINTEXT'
-        controller_quorum = self.kafka.controller_quorum
-        controller_quorum.controller_security_protocol = 'SASL_PLAINTEXT'
-        controller_quorum.controller_sasl_mechanism = 'PLAIN'
-        controller_quorum.intercontroller_security_protocol = 'SASL_PLAINTEXT'
-        controller_quorum.intercontroller_sasl_mechanism = 'GSSAPI'
-        self.kafka.start()
-
-        node = self.producer.nodes[0]
-        node.version = KafkaVersion(str(DEV_BRANCH))
-        self.producer.start()
-        wait_until(lambda: self.producer.num_acked > 5, timeout_sec=15,
-             err_msg="Producer failed to start in a reasonable amount of time.")
-
-        # See above comment above regarding use of version.vstring (distutils.version.LooseVersion)
-        assert is_version(node, [node.version.vstring], logger=self.logger)
-
-        self.producer.wait()
-        num_produced = self.producer.num_acked
-        assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)
-
diff --git a/tests/kafkatest/services/kafka/quorum.py b/tests/kafkatest/services/kafka/quorum.py
index 7348fc1594..9513c8e74b 100644
--- a/tests/kafkatest/services/kafka/quorum.py
+++ b/tests/kafkatest/services/kafka/quorum.py
@@ -21,14 +21,14 @@ remote_raft = 'REMOTE_RAFT' # separate Controllers in KRaft mode, used during/af
 # How we will parameterize tests that exercise all quorum styles
 #   [“ZK”, “REMOTE_RAFT”, "COLOCATED_RAFT"] during the KIP-500 bridge release(s)
 #   [“REMOTE_RAFT”, "COLOCATED_RAFT”] after the KIP-500 bridge release(s)
-all = [zk, remote_raft, colocated_raft]
+all = [zk]
 # How we will parameterize tests that exercise all KRaft quorum styles
-all_raft = [remote_raft, colocated_raft]
+all_raft = []
 # How we will parameterize tests that are unrelated to upgrades:
 #   [“ZK”] before the KIP-500 bridge release(s)
 #   [“ZK”, “REMOTE_RAFT”] during the KIP-500 bridge release(s) and in preview releases
 #   [“REMOTE_RAFT”] after the KIP-500 bridge release(s)
-all_non_upgrade = [zk, remote_raft]
+all_non_upgrade = [zk]
 
 def for_test(test_context):
     # A test uses ZooKeeper if it doesn't specify a metadata quorum or if it explicitly specifies ZooKeeper
diff --git a/tests/kafkatest/tests/core/security_test.py b/tests/kafkatest/tests/core/security_test.py
index 8e2b695074..bf54632a98 100644
--- a/tests/kafkatest/tests/core/security_test.py
+++ b/tests/kafkatest/tests/core/security_test.py
@@ -132,7 +132,7 @@ class SecurityTest(EndToEndTest):
         self.consumer.start()
 
     @cluster(num_nodes=2)
-    @matrix(metadata_quorum=[quorum.zk, quorum.remote_raft])
+    @matrix(metadata_quorum=[quorum.zk])
     def test_quorum_ssl_endpoint_validation_failure(self, metadata_quorum=quorum.zk):
         """
         Test that invalid hostname in ZooKeeper or Raft Controller results in broker inability to start.


[kafka] 01/06: MINOR: Add more validation during KRPC deserialization

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 14951a83e3fdead212156e5532359500d72f68bc
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri May 20 15:23:12 2022 -0700

    MINOR: Add more validation during KRPC deserialization
    
    When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some
    other things), check that we have at least N bytes remaining before allocating an array of size N.
    
    Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were
    remaining. Instead, when reading an individual record in the Raft layer, simply create a
    ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in.
    
    Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in
    RequestResponseTest.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>, Colin McCabe <co...@cmccabe.xyz>
    
    Co-authored-by: Colin McCabe <co...@cmccabe.xyz>
    Co-authored-by: Manikumar Reddy <ma...@gmail.com>
    Co-authored-by: Mickael Maison <mi...@gmail.com>
---
 checkstyle/suppressions.xml                        |  4 +
 .../kafka/common/protocol/ByteBufferAccessor.java  | 14 +++-
 .../common/protocol/DataInputStreamReadable.java   | 18 ++++-
 .../org/apache/kafka/common/protocol/Readable.java |  9 +--
 .../apache/kafka/common/record/DefaultRecord.java  |  2 +
 .../common/message/SimpleArraysMessageTest.java    | 54 +++++++++++++
 .../common/protocol/ByteBufferAccessorTest.java    | 58 ++++++++++++++
 .../kafka/common/record/DefaultRecordTest.java     | 14 ++++
 .../kafka/common/requests/RequestContextTest.java  | 83 +++++++++++++++++++
 .../kafka/common/requests/RequestResponseTest.java | 93 ++++++++++++++++++++++
 .../common/message/SimpleArraysMessage.json        | 29 +++++++
 .../main/scala/kafka/tools/TestRaftServer.scala    |  6 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |  6 +-
 .../apache/kafka/message/MessageDataGenerator.java |  9 ++-
 .../apache/kafka/raft/internals/StringSerde.java   |  3 +-
 15 files changed, 378 insertions(+), 24 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 8163f78f85..8f91d98738 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -153,6 +153,10 @@
     <suppress checks="JavaNCSS"
               files="DistributedHerderTest.java"/>
 
+    <!-- Raft -->
+    <suppress checks="NPathComplexity"
+              files="RecordsIterator.java"/>
+
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
               files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
index 3c5c309731..712973e369 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
@@ -54,8 +54,15 @@ public class ByteBufferAccessor implements Readable, Writable {
     }
 
     @Override
-    public void readArray(byte[] arr) {
+    public byte[] readArray(int size) {
+        int remaining = buf.remaining();
+        if (size > remaining) {
+            throw new RuntimeException("Error reading byte array of " + size + " byte(s): only " + remaining +
+             " byte(s) available");
+        }
+        byte[] arr = new byte[size];
         buf.get(arr);
+        return arr;
     }
 
     @Override
@@ -133,6 +140,11 @@ public class ByteBufferAccessor implements Readable, Writable {
         return ByteUtils.readVarlong(buf);
     }
 
+    @Override
+    public int remaining() {
+        return buf.remaining();
+    }
+
     public void flip() {
         buf.flip();
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
index 93c6c597d7..3f0b96757e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
@@ -76,10 +76,12 @@ public class DataInputStreamReadable implements Readable, Closeable {
     }
 
     @Override
-    public void readArray(byte[] arr) {
+    public byte[] readArray(final int length) {
         try {
+            byte[] arr = new byte[length];
             input.readFully(arr);
-        } catch (IOException e) {
+            return arr;
+        }  catch (IOException e) {
             throw new RuntimeException(e);
         }
     }
@@ -95,8 +97,7 @@ public class DataInputStreamReadable implements Readable, Closeable {
 
     @Override
     public ByteBuffer readByteBuffer(int length) {
-        byte[] arr = new byte[length];
-        readArray(arr);
+        byte[] arr = readArray(length);
         return ByteBuffer.wrap(arr);
     }
 
@@ -118,6 +119,15 @@ public class DataInputStreamReadable implements Readable, Closeable {
         }
     }
 
+    @Override
+    public int remaining() {
+        try {
+            return input.available();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     @Override
     public void close() {
         try {
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
index 46879cde53..f453d12e17 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
@@ -32,15 +32,15 @@ public interface Readable {
     int readInt();
     long readLong();
     double readDouble();
-    void readArray(byte[] arr);
+    byte[] readArray(int length);
     int readUnsignedVarint();
     ByteBuffer readByteBuffer(int length);
     int readVarint();
     long readVarlong();
+    int remaining();
 
     default String readString(int length) {
-        byte[] arr = new byte[length];
-        readArray(arr);
+        byte[] arr = readArray(length);
         return new String(arr, StandardCharsets.UTF_8);
     }
 
@@ -48,8 +48,7 @@ public interface Readable {
         if (unknowns == null) {
             unknowns = new ArrayList<>();
         }
-        byte[] data = new byte[size];
-        readArray(data);
+        byte[] data = readArray(size);
         unknowns.add(new RawTaggedField(tag, data));
         return unknowns;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index d85f1000bc..94896a7c75 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -355,6 +355,8 @@ public class DefaultRecord implements Record {
             int numHeaders = ByteUtils.readVarint(buffer);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
+            if (numHeaders > buffer.remaining())
+                throw new InvalidRecordException("Found invalid number of record headers. " + numHeaders + " is larger than the remaining size of the buffer");
 
             final Header[] headers;
             if (numHeaders == 0)
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
new file mode 100644
index 0000000000..1b78adbb96
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.message;
+
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SimpleArraysMessageTest {
+    @Test
+    public void testArrayBoundsChecking() {
+        // SimpleArraysMessageData takes 2 arrays
+        final ByteBuffer buf = ByteBuffer.wrap(new byte[] {
+            (byte) 0x7f, // Set size of first array to 126 which is larger than the size of this buffer
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00
+        });
+        final SimpleArraysMessageData out = new SimpleArraysMessageData();
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        assertEquals("Tried to allocate a collection of size 126, but there are only 7 bytes remaining.",
+                assertThrows(RuntimeException.class, () -> out.read(accessor, (short) 2)).getMessage());
+    }
+
+    @Test
+    public void testArrayBoundsCheckingOtherArray() {
+        // SimpleArraysMessageData takes 2 arrays
+        final ByteBuffer buf = ByteBuffer.wrap(new byte[] {
+            (byte) 0x01, // Set size of first array to 0
+            (byte) 0x7e, // Set size of second array to 125 which is larger than the size of this buffer
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00
+        });
+        final SimpleArraysMessageData out = new SimpleArraysMessageData();
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        assertEquals("Tried to allocate a collection of size 125, but there are only 6 bytes remaining.",
+                assertThrows(RuntimeException.class, () -> out.read(accessor, (short) 2)).getMessage());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
new file mode 100644
index 0000000000..6a0c6c2681
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ByteBufferAccessorTest {
+    @Test
+    public void testReadArray() {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        final byte[] testArray = new byte[] {0x4b, 0x61, 0x46};
+        accessor.writeByteArray(testArray);
+        accessor.writeInt(12345);
+        accessor.flip();
+        final byte[] testArray2 = accessor.readArray(3);
+        assertArrayEquals(testArray, testArray2);
+        assertEquals(12345, accessor.readInt());
+        assertEquals("Error reading byte array of 3 byte(s): only 0 byte(s) available",
+            assertThrows(RuntimeException.class,
+                () -> accessor.readArray(3)).getMessage());
+    }
+
+    @Test
+    public void testReadString() {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        String testString = "ABC";
+        final byte[] testArray = testString.getBytes(StandardCharsets.UTF_8);
+        accessor.writeByteArray(testArray);
+        accessor.flip();
+        assertEquals("ABC", accessor.readString(3));
+        assertEquals("Error reading byte array of 2 byte(s): only 0 byte(s) available",
+                assertThrows(RuntimeException.class,
+                        () -> accessor.readString(2)).getMessage());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index af154d321d..125c104c04 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -247,6 +247,20 @@ public class DefaultRecordTest {
         buf.flip();
         assertThrows(InvalidRecordException.class,
             () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
+
+        ByteBuffer buf2 = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf2);
+        buf2.put(attributes);
+        ByteUtils.writeVarlong(timestampDelta, buf2);
+        ByteUtils.writeVarint(offsetDelta, buf2);
+        ByteUtils.writeVarint(-1, buf2); // null key
+        ByteUtils.writeVarint(-1, buf2); // null value
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf2); // more headers than remaining buffer size, not allowed
+        buf2.position(buf2.limit());
+
+        buf2.flip();
+        assertThrows(InvalidRecordException.class,
+                () -> DefaultRecord.readFrom(buf2, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index 4415ff960a..254dea0430 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -16,22 +16,31 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RequestContextTest {
@@ -104,4 +113,78 @@ public class RequestContextTest {
         assertEquals(expectedResponse, parsedResponse.data());
     }
 
+    @Test
+    public void testInvalidRequestForImplicitHashCollection() throws UnknownHostException {
+        short version = (short) 5; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = produceRequest(version);
+        // corrupt the length of the topics array
+        corruptBuffer.putInt(8, (Integer.MAX_VALUE - 1) / 2);
+
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, version, "console-producer", 3);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals("Tried to allocate a collection of size 1073741823, but there are only 17 bytes remaining.", msg);
+    }
+
+    @Test
+    public void testInvalidRequestForArrayList() throws UnknownHostException {
+        short version = (short) 5; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = produceRequest(version);
+        // corrupt the length of the partitions array
+        corruptBuffer.putInt(17, Integer.MAX_VALUE);
+
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, version, "console-producer", 3);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals(
+                "Tried to allocate a collection of size 2147483647, but there are only 8 bytes remaining.", msg);
+    }
+
+    private ByteBuffer produceRequest(short version) {
+        ProduceRequestData data = new ProduceRequestData()
+                .setAcks((short) -1)
+                .setTimeoutMs(1);
+        data.topicData().add(
+                new ProduceRequestData.TopicProduceData()
+                        .setName("foo")
+                        .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
+                                .setIndex(42))));
+
+        return serialize(version, data);
+    }
+
+    private ByteBuffer serialize(short version, ApiMessage data) {
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        data.size(cache, version);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        data.write(new ByteBufferAccessor(buffer), cache, version);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Test
+    public void testInvalidRequestForByteArray() throws UnknownHostException {
+        short version = (short) 1; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = serialize(version, new SaslAuthenticateRequestData().setAuthBytes(new byte[0]));
+        // corrupt the length of the bytes array
+        corruptBuffer.putInt(0, Integer.MAX_VALUE);
+
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, version, "console-producer", 1);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals("Error reading byte array of 2147483647 byte(s): only 0 byte(s) available", msg);
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 9346665058..9a4f2e020d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -177,6 +177,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.types.RawTaggedField;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.quota.ClientQuotaFilter;
@@ -197,10 +198,12 @@ import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.SecurityUtils;
 import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -221,6 +224,7 @@ import static org.apache.kafka.common.protocol.ApiKeys.FETCH;
 import static org.apache.kafka.common.protocol.ApiKeys.JOIN_GROUP;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
+import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
 import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -2759,4 +2763,93 @@ public class RequestResponseTest {
         assertEquals(Integer.valueOf(1), createUpdateMetadataResponse().errorCounts().get(Errors.NONE));
         assertEquals(Integer.valueOf(1), createWriteTxnMarkersResponse().errorCounts().get(Errors.NONE));
     }
+
+    @Test
+    public void testInvalidSaslHandShakeRequest() {
+        AbstractRequest request = new SaslHandshakeRequest.Builder(
+                new SaslHandshakeRequestData().setMechanism("PLAIN")).build();
+        ByteBuffer serializedBytes = request.serialize();
+        // corrupt the length of the sasl mechanism string
+        serializedBytes.putShort(0, Short.MAX_VALUE);
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+            parseRequest(request.apiKey(), request.version(), serializedBytes)).getMessage();
+        assertEquals("Error reading byte array of 32767 byte(s): only 5 byte(s) available", msg);
+    }
+
+    @Test
+    public void testInvalidSaslAuthenticateRequest() {
+        short version = (short) 1; // choose a version with fixed length encoding, for simplicity
+        byte[] b = new byte[] {
+            0x11, 0x1f, 0x15, 0x2c,
+            0x5e, 0x2a, 0x20, 0x26,
+            0x6c, 0x39, 0x45, 0x1f,
+            0x25, 0x1c, 0x2d, 0x25,
+            0x43, 0x2a, 0x11, 0x76
+        };
+        SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(b);
+        AbstractRequest request = new SaslAuthenticateRequest(data, version);
+        ByteBuffer serializedBytes = request.serialize();
+
+        // corrupt the length of the bytes array
+        serializedBytes.putInt(0, Integer.MAX_VALUE);
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+                parseRequest(request.apiKey(), request.version(), serializedBytes)).getMessage();
+        assertEquals("Error reading byte array of 2147483647 byte(s): only 20 byte(s) available", msg);
+    }
+
+    @Test
+    public void testValidTaggedFieldsWithSaslAuthenticateRequest() {
+        byte[] byteArray = new byte[11];
+        ByteBufferAccessor accessor = new ByteBufferAccessor(ByteBuffer.wrap(byteArray));
+
+        //construct a SASL_AUTHENTICATE request
+        byte[] authBytes = "test".getBytes(StandardCharsets.UTF_8);
+        accessor.writeUnsignedVarint(authBytes.length + 1);
+        accessor.writeByteArray(authBytes);
+
+        //write total numbers of tags
+        accessor.writeUnsignedVarint(1);
+
+        //write first tag
+        RawTaggedField taggedField = new RawTaggedField(1, new byte[] {0x1, 0x2, 0x3});
+        accessor.writeUnsignedVarint(taggedField.tag());
+        accessor.writeUnsignedVarint(taggedField.size());
+        accessor.writeByteArray(taggedField.data());
+
+        accessor.flip();
+
+        SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest) AbstractRequest.
+                parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer()).request;
+        Assertions.assertArrayEquals(authBytes, saslAuthenticateRequest.data().authBytes());
+        assertEquals(1, saslAuthenticateRequest.data().unknownTaggedFields().size());
+        assertEquals(taggedField, saslAuthenticateRequest.data().unknownTaggedFields().get(0));
+    }
+
+    @Test
+    public void testInvalidTaggedFieldsWithSaslAuthenticateRequest() {
+        byte[] byteArray = new byte[13];
+        ByteBufferAccessor accessor = new ByteBufferAccessor(ByteBuffer.wrap(byteArray));
+
+        //construct a SASL_AUTHENTICATE request
+        byte[] authBytes = "test".getBytes(StandardCharsets.UTF_8);
+        accessor.writeUnsignedVarint(authBytes.length + 1);
+        accessor.writeByteArray(authBytes);
+
+        //write total numbers of tags
+        accessor.writeUnsignedVarint(1);
+
+        //write first tag
+        RawTaggedField taggedField = new RawTaggedField(1, new byte[] {0x1, 0x2, 0x3});
+        accessor.writeUnsignedVarint(taggedField.tag());
+        accessor.writeUnsignedVarint(Short.MAX_VALUE); // set wrong size for tagged field
+        accessor.writeByteArray(taggedField.data());
+
+        accessor.flip();
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+                parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage();
+        assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg);
+    }
 }
diff --git a/clients/src/test/resources/common/message/SimpleArraysMessage.json b/clients/src/test/resources/common/message/SimpleArraysMessage.json
new file mode 100644
index 0000000000..76dc283b6a
--- /dev/null
+++ b/clients/src/test/resources/common/message/SimpleArraysMessage.json
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+{
+  "name": "SimpleArraysMessage",
+  "type": "header",
+  "validVersions": "0-2",
+  "flexibleVersions": "1+",
+  "fields": [
+    { "name": "Goats", "type": "[]StructArray", "versions": "1+",
+      "fields": [
+        { "name": "Color", "type": "int8", "versions": "1+"},
+        { "name": "Name", "type": "string", "versions": "2+"}
+      ]
+    },
+    { "name": "Sheep", "type": "[]int32", "versions": "0+" }
+  ]
+}
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index c25e551ad9..175a7ad0f9 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -280,11 +280,7 @@ object TestRaftServer extends Logging {
       out.writeByteArray(data)
     }
 
-    override def read(input: protocol.Readable, size: Int): Array[Byte] = {
-      val data = new Array[Byte](size)
-      input.readArray(data)
-      data
-    }
+    override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
   }
 
   private class LatencyHistogram(
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 08e7ea481e..6777d9469b 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -350,11 +350,7 @@ object KafkaMetadataLogTest {
     override def write(data: Array[Byte], serializationCache: ObjectSerializationCache, out: Writable): Unit = {
       out.writeByteArray(data)
     }
-    override def read(input: protocol.Readable, size: Int): Array[Byte] = {
-      val array = new Array[Byte](size)
-      input.readArray(array)
-      array
-    }
+    override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
   }
 
   def buildMetadataLogAndDir(
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index 5e591c5214..838c79fb07 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -610,8 +610,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
                 buffer.printf("%s_readable.readByteBuffer(%s)%s",
                     assignmentPrefix, lengthVar, assignmentSuffix);
             } else {
-                buffer.printf("byte[] newBytes = new byte[%s];%n", lengthVar);
-                buffer.printf("_readable.readArray(newBytes);%n");
+                buffer.printf("byte[] newBytes = _readable.readArray(%s);%n", lengthVar);
                 buffer.printf("%snewBytes%s", assignmentPrefix, assignmentSuffix);
             }
         } else if (type.isRecords()) {
@@ -619,6 +618,12 @@ public final class MessageDataGenerator implements MessageClassGenerator {
                 assignmentPrefix, lengthVar, assignmentSuffix);
         } else if (type.isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
+            buffer.printf("if (%s > _readable.remaining()) {%n", lengthVar);
+            buffer.incrementIndent();
+            buffer.printf("throw new RuntimeException(\"Tried to allocate a collection of size \" + %s + \", but " +
+                    "there are only \" + _readable.remaining() + \" bytes remaining.\");%n", lengthVar);
+            buffer.decrementIndent();
+            buffer.printf("}%n");
             if (isStructArrayWithKeys) {
                 headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
                 buffer.printf("%s newCollection = new %s(%s);%n",
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
index cf096dfe69..14f5bd63fd 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
@@ -40,8 +40,7 @@ public class StringSerde implements RecordSerde<String> {
 
     @Override
     public String read(Readable input, int size) {
-        byte[] data = new byte[size];
-        input.readArray(data);
+        byte[] data = input.readArray(size);
         return Utils.utf8(data);
     }
 


[kafka] 02/06: MINOR: Add configurable max receive size for SASL authentication requests

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b7dd40ff2bfb4e0cd726c1168e039d828daf113d
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Mon May 16 19:25:02 2022 +0530

    MINOR: Add configurable max receive size for SASL authentication requests
    
    This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>
    
    Co-authored-by: Manikumar Reddy <ma...@gmail.com>
    Co-authored-by: Mickael Maison <mi...@gmail.com>
---
 checkstyle/suppressions.xml                        |  2 +
 .../config/internals/BrokerSecurityConfigs.java    |  6 +++
 .../authenticator/SaslServerAuthenticator.java     | 16 ++++++--
 .../kafka/common/security/TestSecurityConfig.java  |  2 +
 .../authenticator/SaslAuthenticatorTest.java       | 46 ++++++++++++++++++++++
 .../authenticator/SaslServerAuthenticatorTest.java |  6 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 ++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  2 +
 8 files changed, 77 insertions(+), 7 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 8f91d98738..8324e763b8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -32,6 +32,8 @@
               files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
+    <suppress checks="NPath"
+              files="SaslServerAuthenticator.java"/>
     <suppress checks="ClassFanOutComplexity"
               files="Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 3b84908fb1..840ac485f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -36,6 +36,8 @@ public class BrokerSecurityConfigs {
     public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
     public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
     public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms";
+    public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288;
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = "sasl.server.max.receive.size";
 
     public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
             "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
@@ -90,4 +92,8 @@ public class BrokerSecurityConfigs {
             + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently "
             + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL "
             + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_DOC = "The maximum receive size allowed before and during initial SASL authentication." +
+            " Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice," +
+            " PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.";
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 243495da9f..f42d1ac719 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
+import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.ByteBufferSend;
 import org.apache.kafka.common.network.ChannelBuilders;
@@ -88,8 +89,6 @@ import java.util.Optional;
 import java.util.function.Supplier;
 
 public class SaslServerAuthenticator implements Authenticator {
-    // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
-    static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
     /**
@@ -140,6 +139,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private String saslMechanism;
 
     // buffers used in `authenticate`
+    private Integer saslAuthRequestMaxReceiveSize;
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
     private Send authenticationFailureSend = null;
@@ -189,6 +189,10 @@ public class SaslServerAuthenticator implements Authenticator {
         // Note that the old principal builder does not support SASL, so we do not need to pass the
         // authenticator or the transport layer
         this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser, null);
+
+        saslAuthRequestMaxReceiveSize = (Integer) configs.get(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG);
+        if (saslAuthRequestMaxReceiveSize == null)
+            saslAuthRequestMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE;
     }
 
     private void createSaslServer(String mechanism) throws IOException {
@@ -252,9 +256,13 @@ public class SaslServerAuthenticator implements Authenticator {
             }
 
             // allocate on heap (as opposed to any socket server memory pool)
-            if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+            if (netInBuffer == null) netInBuffer = new NetworkReceive(saslAuthRequestMaxReceiveSize, connectionId);
 
-            netInBuffer.readFrom(transportLayer);
+            try {
+                netInBuffer.readFrom(transportLayer);
+            } catch (InvalidReceiveException e) {
+                throw new SaslAuthenticationException("Failing SASL authentication due to invalid receive size", e);
+            }
             if (!netInBuffer.complete())
                 return;
             netInBuffer.payload().rewind();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
index 07cbb7856d..197151f5fb 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -38,6 +38,8 @@ public class TestSecurityConfig extends AbstractConfig {
                     null, Importance.MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
             .define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, Type.LONG, 0L, Importance.MEDIUM,
                     BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
+            .define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, Type.INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE,
+                    Importance.LOW, BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC)
             .withClientSslSupport()
             .withClientSaslSupport();
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 60c8dbf484..937f484812 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -212,6 +212,52 @@ public class SaslAuthenticatorTest {
         checkAuthenticationAndReauthentication(securityProtocol, node);
     }
 
+    /**
+     * Test SASL/PLAIN with sasl.authentication.max.receive.size config
+     */
+    @Test
+    public void testSaslAuthenticationMaxReceiveSize() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
+
+        // test auth with 1KB receive size
+        saslServerConfigs.put(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, "1024");
+        server = createEchoServer(securityProtocol);
+
+        // test valid sasl authentication
+        String node1 = "valid";
+        checkAuthenticationAndReauthentication(securityProtocol, node1);
+
+        // test with handshake request with large mechanism string
+        byte[] bytes = new byte[1024];
+        new Random().nextBytes(bytes);
+        String mechanism = new String(bytes, StandardCharsets.UTF_8);
+        String node2 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+        SaslHandshakeRequest handshakeRequest = buildSaslHandshakeRequest(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion());
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version(), "someclient", nextCorrelationId++);
+        NetworkSend send = new NetworkSend(node2, handshakeRequest.toSend(header));
+        selector.send(send);
+        //we will get exception in server and connection gets closed.
+        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
+        selector.close();
+
+        String node3 = "invalid2";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node3);
+        sendHandshakeRequestReceiveResponse(node3, ApiKeys.SASL_HANDSHAKE.latestVersion());
+
+        // test with sasl authenticate request with large auth_byes string
+        String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" +  new String(bytes, StandardCharsets.UTF_8);
+        ByteBuffer authBuf = ByteBuffer.wrap(Utils.utf8(authString));
+        SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(authBuf.array());
+        SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build();
+        header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, request.version(), "someclient", nextCorrelationId++);
+        send = new NetworkSend(node3, request.toSend(header));
+        selector.send(send);
+        NetworkTestUtils.waitForChannelClose(selector, node3, ChannelState.READY.state());
+        server.verifyAuthenticationMetrics(1, 2);
+    }
+
     /**
      * Tests that SASL/PLAIN clients with invalid password fail authentication.
      */
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index af0fedd4f5..8245f57516 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -19,11 +19,11 @@ package org.apache.kafka.common.security.authenticator;
 import java.net.InetAddress;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.network.ChannelMetadataRegistry;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
-import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -68,10 +68,10 @@ public class SaslServerAuthenticatorTest {
             SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());
 
         when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
-            invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
+            invocation.<ByteBuffer>getArgument(0).putInt(BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE + 1);
             return 4;
         });
-        assertThrows(InvalidReceiveException.class, authenticator::authenticate);
+        assertThrows(SaslAuthenticationException.class, authenticator::authenticate);
         verify(transportLayer).read(any(ByteBuffer.class));
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index f1158bba4e..8d5d12a1e9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -249,6 +249,7 @@ object Defaults {
 
     /** ********* General Security configuration ***********/
   val ConnectionsMaxReauthMsDefault = 0L
+  val DefaultServerMaxMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE
 
   /** ********* Sasl configuration ***********/
   val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
@@ -539,6 +540,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
   val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
+  val SaslServerMaxReceiveSizeProp = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG
   val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG
 
   /** ********* SSL Configuration ****************/
@@ -963,6 +965,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
   val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
+  val SaslServerMaxReceiveSizeDoc = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC
   val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC
 
   /** ********* SSL Configuration ****************/
@@ -1247,6 +1250,7 @@ object KafkaConfig {
 
       /** ********* General Security Configuration ****************/
       .define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
+      .define(SaslServerMaxReceiveSizeProp, INT, Defaults.DefaultServerMaxMaxReceiveSize, MEDIUM, SaslServerMaxReceiveSizeDoc)
       .define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc)
 
       /** ********* SSL Configuration ****************/
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1f803b6476..014a123651 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -820,6 +820,8 @@ class KafkaConfigTest {
         case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
         case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
 
+        case KafkaConfig.SaslServerMaxReceiveSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
+
         // Raft Quorum Configs
         case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string
         case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")


[kafka] 06/06: MINOR: Update version to 2.8.2

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 00acd559a8edac568a107933e07c6c3da71e17a4
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Sep 2 14:12:42 2022 +0530

    MINOR: Update version to 2.8.2
---
 docs/js/templateData.js                                                | 2 +-
 gradle.properties                                                      | 2 +-
 streams/quickstart/java/pom.xml                                        | 2 +-
 streams/quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +-
 streams/quickstart/pom.xml                                             | 2 +-
 tests/kafkatest/__init__.py                                            | 2 +-
 6 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/js/templateData.js b/docs/js/templateData.js
index 2347ee4e65..4e10a60c5f 100644
--- a/docs/js/templateData.js
+++ b/docs/js/templateData.js
@@ -19,6 +19,6 @@ limitations under the License.
 var context={
     "version": "28",
     "dotVersion": "2.8",
-    "fullDotVersion": "2.8.2-SNAPSHOT",
+    "fullDotVersion": "2.8.2",
     "scalaVersion": "2.13"
 };
diff --git a/gradle.properties b/gradle.properties
index bb3da5abe7..b107407905 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -20,7 +20,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py
-version=2.8.2-SNAPSHOT
+version=2.8.2
 scalaVersion=2.13.5
 task=build
 org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml
index 6e1576e990..d59ba32757 100644
--- a/streams/quickstart/java/pom.xml
+++ b/streams/quickstart/java/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.kafka</groupId>
         <artifactId>streams-quickstart</artifactId>
-        <version>2.8.2-SNAPSHOT</version>
+        <version>2.8.2</version>
         <relativePath>..</relativePath>
     </parent>
 
diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
index 6f00851f6d..3b289fa797 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
@@ -29,7 +29,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <kafka.version>2.8.2-SNAPSHOT</kafka.version>
+        <kafka.version>2.8.2</kafka.version>
         <slf4j.version>1.7.7</slf4j.version>
         <log4j.version>1.2.17</log4j.version>
     </properties>
diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml
index c8049ad5e2..b29f2ad9ea 100644
--- a/streams/quickstart/pom.xml
+++ b/streams/quickstart/pom.xml
@@ -22,7 +22,7 @@
     <groupId>org.apache.kafka</groupId>
     <artifactId>streams-quickstart</artifactId>
     <packaging>pom</packaging>
-    <version>2.8.2-SNAPSHOT</version>
+    <version>2.8.2</version>
 
     <name>Kafka Streams :: Quickstart</name>
 
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index cbb6048154..c769c3702b 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -22,4 +22,4 @@
 # Instead, in development branches, the version should have a suffix of the form ".devN"
 #
 # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0"
-__version__ = '2.8.2.dev0'
+__version__ = '2.8.2'


[kafka] 04/06: MINOR: Update LICENSE for 2.8.2

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7ef72abb4d5ea65405d6115a328cf2c704de2c2d
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Fri Sep 2 14:01:42 2022 +0530

     MINOR: Update LICENSE for 2.8.2
---
 LICENSE-binary | 37 +++++++++++++++++++------------------
 1 file changed, 19 insertions(+), 18 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index d58b01716a..7d885849c6 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -218,32 +218,33 @@ jackson-jaxrs-json-provider-2.10.5
 jackson-module-jaxb-annotations-2.10.5
 jackson-module-paranamer-2.10.5
 jackson-module-scala_2.13-2.10.5
+jackson-module-scala_2.12-2.10.5
 jakarta.validation-api-2.0.2
 javassist-3.27.0-GA
-jetty-client-9.4.43.v20210629
-jetty-continuation-9.4.43.v20210629
-jetty-http-9.4.43.v20210629
-jetty-io-9.4.43.v20210629
-jetty-security-9.4.43.v20210629
-jetty-server-9.4.43.v20210629
-jetty-servlet-9.4.43.v20210629
-jetty-servlets-9.4.43.v20210629
-jetty-util-9.4.43.v20210629
-jetty-util-ajax-9.4.43.v20210629
+jetty-client-9.4.48.v20220622
+jetty-continuation-9.4.48.v20220622
+jetty-http-9.4.48.v20220622
+jetty-io-9.4.48.v20220622
+jetty-security-9.4.48.v20220622
+jetty-server-9.4.48.v20220622
+jetty-servlet-9.4.48.v20220622
+jetty-servlets-9.4.48.v20220622
+jetty-util-9.4.48.v20220622
+jetty-util-ajax-9.4.48.v20220622
 jersey-common-2.34
 jersey-server-2.34
 log4j-1.2.17
 lz4-java-1.7.1
 maven-artifact-3.8.1
 metrics-core-2.2.0
-netty-buffer-4.1.62.Final
-netty-codec-4.1.62.Final
-netty-common-4.1.62.Final
-netty-handler-4.1.62.Final
-netty-resolver-4.1.62.Final
-netty-transport-4.1.62.Final
-netty-transport-native-epoll-4.1.62.Final
-netty-transport-native-unix-common-4.1.62.Final
+netty-buffer-4.1.73.Final
+netty-codec-4.1.73.Final
+netty-common-4.1.73.Final
+netty-handler-4.1.73.Final
+netty-resolver-4.1.73.Final
+netty-transport-4.1.73.Final
+netty-transport-native-epoll-4.1.73.Final
+netty-transport-native-unix-common-4.1.73.Final
 plexus-utils-3.2.1
 rocksdbjni-5.18.4
 scala-collection-compat_2.13-2.3.0