You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by to...@apache.org on 2022/09/02 13:37:13 UTC

[kafka] branch 3.0 updated (967b89f786 -> 1142808787)

This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a change to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 967b89f786 MINOR: Add note on IDEMPOTENT_WRITE ACL to notable changes (#12260)
     new aaceb6b79b MINOR: Add more validation during KRPC deserialization
     new 65a1e0451f MINOR: Add configurable max receive size for SASL authentication requests
     new 17f695c4c2 MINOR: Update version to 3.0.2
     new 110a640f76 MINOR: Update docs/upgrade.html
     new 286eceea7d MINOR: Update LICENSE-binary
     new 1142808787 KAFKA-10712; Update release scripts to Python3 (#11538)

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 LICENSE-binary                                     |  20 +--
 checkstyle/suppressions.xml                        |   6 +
 .../config/internals/BrokerSecurityConfigs.java    |   6 +
 .../kafka/common/protocol/ByteBufferAccessor.java  |   9 +-
 .../common/protocol/DataInputStreamReadable.java   | 139 ---------------------
 .../org/apache/kafka/common/protocol/Readable.java |   8 +-
 .../apache/kafka/common/record/DefaultRecord.java  |   2 +
 .../authenticator/SaslServerAuthenticator.java     |  16 ++-
 .../common/message/SimpleArraysMessageTest.java    |  54 ++++++++
 .../common/protocol/ByteBufferAccessorTest.java    |  58 +++++++++
 .../kafka/common/record/DefaultRecordTest.java     |  14 +++
 .../kafka/common/requests/RequestContextTest.java  |  83 ++++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  92 ++++++++++++++
 .../kafka/common/security/TestSecurityConfig.java  |   2 +
 .../authenticator/SaslAuthenticatorTest.java       |  46 +++++++
 .../authenticator/SaslServerAuthenticatorTest.java |   6 +-
 .../common/message/SimpleArraysMessage.json}       |  15 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   4 +
 .../main/scala/kafka/tools/TestRaftServer.scala    |   6 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |   6 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   2 +
 docs/js/templateData.js                            |   2 +-
 docs/upgrade.html                                  |  55 ++++++++
 .../apache/kafka/message/MessageDataGenerator.java |   9 +-
 gradle.properties                                  |   2 +-
 .../kafka/raft/internals/RecordsIterator.java      | 105 +++++++++++-----
 .../apache/kafka/raft/internals/StringSerde.java   |   3 +-
 release.py                                         |  49 ++++----
 release_notes.py                                   |  29 ++---
 streams/quickstart/java/pom.xml                    |   2 +-
 .../src/main/resources/archetype-resources/pom.xml |   2 +-
 streams/quickstart/pom.xml                         |   2 +-
 tests/kafkatest/__init__.py                        |   2 +-
 33 files changed, 601 insertions(+), 255 deletions(-)
 delete mode 100644 clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
 create mode 100644 clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
 create mode 100644 clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
 copy clients/src/{main/resources/common/message/ResponseHeader.json => test/resources/common/message/SimpleArraysMessage.json} (71%)


[kafka] 06/06: KAFKA-10712; Update release scripts to Python3 (#11538)

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 114280878709833410981c986bc4f76a34a008af
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Nov 29 10:42:14 2021 +0100

    KAFKA-10712; Update release scripts to Python3 (#11538)
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 release.py       | 49 ++++++++++++++++++++++++-------------------------
 release_notes.py | 29 +++++++++++++++--------------
 2 files changed, 39 insertions(+), 39 deletions(-)

diff --git a/release.py b/release.py
index f43e244b0e..456d60cd56 100755
--- a/release.py
+++ b/release.py
@@ -51,8 +51,6 @@ release.py release-email
 
 """
 
-from __future__ import print_function
-
 import datetime
 from getpass import getpass
 import json
@@ -98,25 +96,25 @@ def print_output(output):
         print(">", line)
 
 def cmd(action, cmd_arg, *args, **kwargs):
-    if isinstance(cmd_arg, basestring) and not kwargs.get("shell", False):
+    if isinstance(cmd_arg, str) and not kwargs.get("shell", False):
         cmd_arg = cmd_arg.split()
     allow_failure = kwargs.pop("allow_failure", False)
     num_retries = kwargs.pop("num_retries", 0)
 
     stdin_log = ""
-    if "stdin" in kwargs and isinstance(kwargs["stdin"], basestring):
+    if "stdin" in kwargs and isinstance(kwargs["stdin"], str):
         stdin_log = "--> " + kwargs["stdin"]
         stdin = tempfile.TemporaryFile()
-        stdin.write(kwargs["stdin"])
+        stdin.write(kwargs["stdin"].encode('utf-8'))
         stdin.seek(0)
         kwargs["stdin"] = stdin
 
     print(action, cmd_arg, stdin_log)
     try:
         output = subprocess.check_output(cmd_arg, *args, stderr=subprocess.STDOUT, **kwargs)
-        print_output(output)
+        print_output(output.decode('utf-8'))
     except subprocess.CalledProcessError as e:
-        print_output(e.output)
+        print_output(e.output.decode('utf-8'))
 
         if num_retries > 0:
             kwargs['num_retries'] = num_retries - 1
@@ -136,9 +134,9 @@ def cmd(action, cmd_arg, *args, **kwargs):
 
 
 def cmd_output(cmd, *args, **kwargs):
-    if isinstance(cmd, basestring):
+    if isinstance(cmd, str):
         cmd = cmd.split()
-    return subprocess.check_output(cmd, *args, stderr=subprocess.STDOUT, **kwargs)
+    return subprocess.check_output(cmd, *args, stderr=subprocess.STDOUT, **kwargs).decode('utf-8')
 
 def replace(path, pattern, replacement):
     updated = []
@@ -161,7 +159,7 @@ def regexReplace(path, pattern, replacement):
             f.write(line)
 
 def user_ok(msg):
-    ok = raw_input(msg)
+    ok = input(msg)
     return ok.strip().lower() == 'y'
 
 def sftp_mkdir(dir):
@@ -204,13 +202,14 @@ def get_jdk(prefs, version):
     """
     Get settings for the specified JDK version.
     """
-    jdk_java_home = get_pref(prefs, 'jdk%d' % version, lambda: raw_input("Enter the path for JAVA_HOME for a JDK%d compiler (blank to use default JAVA_HOME): " % version))
+    jdk_java_home = get_pref(prefs, 'jdk%d' % version, lambda: input("Enter the path for JAVA_HOME for a JDK%d compiler (blank to use default JAVA_HOME): " % version))
     jdk_env = dict(os.environ) if jdk_java_home.strip() else None
     if jdk_env is not None: jdk_env['JAVA_HOME'] = jdk_java_home
-    javaVersion = cmd_output("%s/bin/java -version" % jdk_java_home, env=jdk_env)
-    if version == 8 and "1.8.0" not in javaVersion:
-      fail("JDK 8 is required")
-    elif "%d.0" % version not in javaVersion:
+    java_version = cmd_output("%s/bin/java -version" % jdk_java_home, env=jdk_env)
+    if version == 8:
+      if "1.8.0" not in java_version:
+        fail("JDK 8 is required")
+    elif "%d.0" % version not in java_version and '"%d"' % version not in java_version:
       fail("JDK %s is required" % version)
     return jdk_env
 
@@ -271,7 +270,7 @@ def command_stage_docs():
 
     versioned_docs_path = os.path.join(kafka_site_repo_path, docs_version(version))
     if not os.path.exists(versioned_docs_path):
-        os.mkdir(versioned_docs_path, 0755)
+        os.mkdir(versioned_docs_path, 755)
 
     # The contents of the docs jar are site-docs/<docs dir>. We need to get rid of the site-docs prefix and dump everything
     # inside it into the docs version subdirectory in the kafka-site repo
@@ -309,16 +308,16 @@ def command_release_announcement_email():
     release_tags = sorted([t for t in tags if re.match(release_tag_pattern, t)])
     release_version_num = release_tags[-1]
     if not user_ok("""Is the current release %s ? (y/n): """ % release_version_num):
-        release_version_num = raw_input('What is the current release version:')
+        release_version_num = input('What is the current release version:')
         validate_release_num(release_version_num)
     previous_release_version_num = release_tags[-2]
     if not user_ok("""Is the previous release %s ? (y/n): """ % previous_release_version_num):
-        previous_release_version_num = raw_input('What is the previous release version:')
+        previous_release_version_num = input('What is the previous release version:')
         validate_release_num(previous_release_version_num)
     if release_version_num < previous_release_version_num :
         fail("Current release version number can't be less than previous release version number")
-    number_of_contributors = int(subprocess.check_output('git shortlog -sn --no-merges %s..%s | wc -l' % (previous_release_version_num, release_version_num) , shell=True))
-    contributors = subprocess.check_output("git shortlog -sn --no-merges %s..%s | cut -f2 | sort --ignore-case" % (previous_release_version_num, release_version_num), shell=True)
+    number_of_contributors = int(subprocess.check_output('git shortlog -sn --no-merges %s..%s | wc -l' % (previous_release_version_num, release_version_num) , shell=True).decode('utf-8'))
+    contributors = subprocess.check_output("git shortlog -sn --no-merges %s..%s | cut -f2 | sort --ignore-case" % (previous_release_version_num, release_version_num), shell=True).decode('utf-8')
     release_announcement_data = {
         'number_of_contributors': number_of_contributors,
         'contributors': ', '.join(str(x) for x in filter(None, contributors.split('\n'))),
@@ -481,10 +480,10 @@ starting_branch = cmd_output('git rev-parse --abbrev-ref HEAD')
 cmd("Verifying that you have no unstaged git changes", 'git diff --exit-code --quiet')
 cmd("Verifying that you have no staged git changes", 'git diff --cached --exit-code --quiet')
 
-release_version = raw_input("Release version (without any RC info, e.g. 1.0.0): ")
+release_version = input("Release version (without any RC info, e.g. 1.0.0): ")
 release_version_parts = get_release_version_parts(release_version)
 
-rc = raw_input("Release candidate number: ")
+rc = input("Release candidate number: ")
 
 dev_branch = '.'.join(release_version_parts[:2])
 docs_release_version = docs_version(release_version)
@@ -508,7 +507,7 @@ if not rc:
     sys.exit(0)
 
 # Prereq checks
-apache_id = get_pref(prefs, 'apache_id', lambda: raw_input("Enter your apache username: "))
+apache_id = get_pref(prefs, 'apache_id', lambda: input("Enter your apache username: "))
 
 jdk8_env = get_jdk(prefs, 8)
 jdk15_env = get_jdk(prefs, 15)
@@ -517,7 +516,7 @@ def select_gpg_key():
     print("Here are the available GPG keys:")
     available_keys = cmd_output("gpg --list-secret-keys")
     print(available_keys)
-    key_name = raw_input("Which user name (enter the user name without email address): ")
+    key_name = input("Which user name (enter the user name without email address): ")
     if key_name not in available_keys:
         fail("Couldn't find the requested key.")
     return key_name
@@ -527,7 +526,7 @@ key_name = get_pref(prefs, 'gpg-key', select_gpg_key)
 gpg_passphrase = get_pref(prefs, 'gpg-pass', lambda: getpass("Passphrase for this GPG key: "))
 # Do a quick validation so we can fail fast if the password is incorrect
 with tempfile.NamedTemporaryFile() as gpg_test_tempfile:
-    gpg_test_tempfile.write("abcdefg")
+    gpg_test_tempfile.write("abcdefg".encode('utf-8'))
     cmd("Testing GPG key & passphrase", ["gpg", "--batch", "--pinentry-mode", "loopback", "--passphrase-fd", "0", "-u", key_name, "--armor", "--output", gpg_test_tempfile.name + ".asc", "--detach-sig", gpg_test_tempfile.name], stdin=gpg_passphrase)
 
 save_prefs(prefs)
diff --git a/release_notes.py b/release_notes.py
index 029edcbacf..e44c74d5b2 100755
--- a/release_notes.py
+++ b/release_notes.py
@@ -28,7 +28,7 @@ from jira import JIRA
 import itertools, sys
 
 if len(sys.argv) < 2:
-    print >>sys.stderr, "Usage: release_notes.py <version>"
+    print("Usage: release_notes.py <version>", file=sys.stderr)
     sys.exit(1)
 
 version = sys.argv[1]
@@ -58,7 +58,7 @@ if __name__ == "__main__":
     apache = JIRA(JIRA_BASE_URL)
     issues = get_issues(apache, 'project=KAFKA and fixVersion=%s' % version)
     if not issues:
-        print >>sys.stderr, "Didn't find any issues for the target fix version"
+        print("Didn't find any issues for the target fix version", file=sys.stderr)
         sys.exit(1)
 
     # Some resolutions, including a lack of resolution, indicate that the bug hasn't actually been addressed and we shouldn't even be able to create a release until they are fixed
@@ -78,11 +78,11 @@ if __name__ == "__main__":
                               ]
     unresolved_issues = [issue for issue in issues if issue.fields.resolution in UNRESOLVED_RESOLUTIONS or issue.fields.resolution.name in UNRESOLVED_RESOLUTIONS]
     if unresolved_issues:
-        print >>sys.stderr, "The release is not completed since unresolved issues or improperly resolved issues were found still tagged with this release as the fix version:"
+        print("The release is not completed since unresolved issues or improperly resolved issues were found still tagged with this release as the fix version:", file=sys.stderr)
         for issue in unresolved_issues:
-            print >>sys.stderr, "Unresolved issue: %15s %20s %s" % (issue.key, issue.fields.resolution, issue_link(issue))
-        print >>sys.stderr
-        print >>sys.stderr, "Note that for some resolutions, you should simply remove the fix version as they have not been truly fixed in this release."
+            print("Unresolved issue: %15s %20s %s" % (issue.key, issue.fields.resolution, issue_link(issue)), file=sys.stderr)
+        print("", file=sys.stderr)
+        print("Note that for some resolutions, you should simply remove the fix version as they have not been truly fixed in this release.", file=sys.stderr)
         sys.exit(1)
 
     # Get list of (issue type, [issues]) sorted by the issue ID type, with each subset of issues sorted by their key so they
@@ -93,11 +93,12 @@ if __name__ == "__main__":
             return -2
         if issue.fields.issuetype.name == 'Improvement':
             return -1
-        return issue.fields.issuetype.id
+        return int(issue.fields.issuetype.id)
+
     by_group = [(k,sorted(g, key=lambda issue: issue.id)) for k,g in itertools.groupby(sorted(issues, key=issue_type_key), lambda issue: issue.fields.issuetype.name)]
 
-    print "<h1>Release Notes - Kafka - Version %s</h1>" % version
-    print """<p>Below is a summary of the JIRA issues addressed in the %(version)s release of Kafka. For full documentation of the
+    print("<h1>Release Notes - Kafka - Version %s</h1>" % version)
+    print("""<p>Below is a summary of the JIRA issues addressed in the %(version)s release of Kafka. For full documentation of the
     release, a guide to get started, and information about the project, see the <a href="https://kafka.apache.org/">Kafka
     project site</a>.</p>
 
@@ -107,10 +108,10 @@ if __name__ == "__main__":
     changes, performance changes, and any other changes that might impact your production deployment of Kafka.</p>
 
     <p>The documentation for the most recent release can be found at
-    <a href="https://kafka.apache.org/documentation.html">https://kafka.apache.org/documentation.html</a>.</p>""" % { 'version': version, 'minor': minor_version_dotless }
+    <a href="https://kafka.apache.org/documentation.html">https://kafka.apache.org/documentation.html</a>.</p>""" % { 'version': version, 'minor': minor_version_dotless })
     for itype, issues in by_group:
-        print "<h2>%s</h2>" % itype
-        print "<ul>"
+        print("<h2>%s</h2>" % itype)
+        print("<ul>")
         for issue in issues:
-            print '<li>[<a href="%(link)s">%(key)s</a>] - %(summary)s</li>' % {'key': issue.key, 'link': issue_link(issue), 'summary': issue.fields.summary}
-        print "</ul>"
+            print('<li>[<a href="%(link)s">%(key)s</a>] - %(summary)s</li>' % {'key': issue.key, 'link': issue_link(issue), 'summary': issue.fields.summary})
+        print("</ul>")


[kafka] 01/06: MINOR: Add more validation during KRPC deserialization

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit aaceb6b79bfcb1d32874ccdbc8f3138d1c1c00fb
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Fri May 20 15:23:12 2022 -0700

    MINOR: Add more validation during KRPC deserialization
    
    When deserializing KRPC (which is used for RPCs sent to Kafka, Kafka Metadata records, and some
    other things), check that we have at least N bytes remaining before allocating an array of size N.
    
    Remove DataInputStreamReadable since it was hard to make this class aware of how many bytes were
    remaining. Instead, when reading an individual record in the Raft layer, simply create a
    ByteBufferAccessor with a ByteBuffer containing just the bytes we're interested in.
    
    Add SimpleArraysMessageTest and ByteBufferAccessorTest. Also add some additional tests in
    RequestResponseTest.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>, Colin McCabe <co...@cmccabe.xyz>
    
    Co-authored-by: Colin McCabe <co...@cmccabe.xyz>
    Co-authored-by: Manikumar Reddy <ma...@gmail.com>
    Co-authored-by: Mickael Maison <mi...@gmail.com>
---
 checkstyle/suppressions.xml                        |   4 +
 .../kafka/common/protocol/ByteBufferAccessor.java  |   9 +-
 .../common/protocol/DataInputStreamReadable.java   | 139 ---------------------
 .../org/apache/kafka/common/protocol/Readable.java |   8 +-
 .../apache/kafka/common/record/DefaultRecord.java  |   2 +
 .../common/message/SimpleArraysMessageTest.java    |  54 ++++++++
 .../common/protocol/ByteBufferAccessorTest.java    |  58 +++++++++
 .../kafka/common/record/DefaultRecordTest.java     |  14 +++
 .../kafka/common/requests/RequestContextTest.java  |  83 ++++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  92 ++++++++++++++
 .../common/message/SimpleArraysMessage.json        |  29 +++++
 .../main/scala/kafka/tools/TestRaftServer.scala    |   6 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |   6 +-
 .../apache/kafka/message/MessageDataGenerator.java |   9 +-
 .../kafka/raft/internals/RecordsIterator.java      | 105 +++++++++++-----
 .../apache/kafka/raft/internals/StringSerde.java   |   3 +-
 16 files changed, 434 insertions(+), 187 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index b7a7192754..69c9374f5b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -150,6 +150,10 @@
     <suppress checks="JavaNCSS"
               files="DistributedHerderTest.java"/>
 
+    <!-- Raft -->
+    <suppress checks="NPathComplexity"
+              files="RecordsIterator.java"/>
+
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
               files="(KafkaStreams|KStreamImpl|KTableImpl|StreamsPartitionAssignor).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
index bd0925d6db..f643f5b577 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
@@ -54,8 +54,15 @@ public class ByteBufferAccessor implements Readable, Writable {
     }
 
     @Override
-    public void readArray(byte[] arr) {
+    public byte[] readArray(int size) {
+        int remaining = buf.remaining();
+        if (size > remaining) {
+            throw new RuntimeException("Error reading byte array of " + size + " byte(s): only " + remaining +
+                    " byte(s) available");
+        }
+        byte[] arr = new byte[size];
         buf.get(arr);
+        return arr;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java b/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
deleted file mode 100644
index 70ed52d6a0..0000000000
--- a/clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.apache.kafka.common.utils.ByteUtils;
-
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-public class DataInputStreamReadable implements Readable, Closeable {
-    protected final DataInputStream input;
-
-    public DataInputStreamReadable(DataInputStream input) {
-        this.input = input;
-    }
-
-    @Override
-    public byte readByte() {
-        try {
-            return input.readByte();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public short readShort() {
-        try {
-            return input.readShort();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public int readInt() {
-        try {
-            return input.readInt();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public long readLong() {
-        try {
-            return input.readLong();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public double readDouble() {
-        try {
-            return input.readDouble();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void readArray(byte[] arr) {
-        try {
-            input.readFully(arr);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public int readUnsignedVarint() {
-        try {
-            return ByteUtils.readUnsignedVarint(input);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public ByteBuffer readByteBuffer(int length) {
-        byte[] arr = new byte[length];
-        readArray(arr);
-        return ByteBuffer.wrap(arr);
-    }
-
-    @Override
-    public int readVarint() {
-        try {
-            return ByteUtils.readVarint(input);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public long readVarlong() {
-        try {
-            return ByteUtils.readVarlong(input);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public int remaining() {
-        try {
-            return input.available();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void close() {
-        try {
-            input.close();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
index 9c9e461ca8..f453d12e17 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
@@ -32,7 +32,7 @@ public interface Readable {
     int readInt();
     long readLong();
     double readDouble();
-    void readArray(byte[] arr);
+    byte[] readArray(int length);
     int readUnsignedVarint();
     ByteBuffer readByteBuffer(int length);
     int readVarint();
@@ -40,8 +40,7 @@ public interface Readable {
     int remaining();
 
     default String readString(int length) {
-        byte[] arr = new byte[length];
-        readArray(arr);
+        byte[] arr = readArray(length);
         return new String(arr, StandardCharsets.UTF_8);
     }
 
@@ -49,8 +48,7 @@ public interface Readable {
         if (unknowns == null) {
             unknowns = new ArrayList<>();
         }
-        byte[] data = new byte[size];
-        readArray(data);
+        byte[] data = readArray(size);
         unknowns.add(new RawTaggedField(tag, data));
         return unknowns;
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 8772556b1d..b2235fef49 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -342,6 +342,8 @@ public class DefaultRecord implements Record {
             int numHeaders = ByteUtils.readVarint(buffer);
             if (numHeaders < 0)
                 throw new InvalidRecordException("Found invalid number of record headers " + numHeaders);
+            if (numHeaders > buffer.remaining())
+                throw new InvalidRecordException("Found invalid number of record headers. " + numHeaders + " is larger than the remaining size of the buffer");
 
             final Header[] headers;
             if (numHeaders == 0)
diff --git a/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
new file mode 100644
index 0000000000..1b78adbb96
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/message/SimpleArraysMessageTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.message;
+
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SimpleArraysMessageTest {
+    @Test
+    public void testArrayBoundsChecking() {
+        // SimpleArraysMessageData takes 2 arrays
+        final ByteBuffer buf = ByteBuffer.wrap(new byte[] {
+            (byte) 0x7f, // Set size of first array to 126 which is larger than the size of this buffer
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00
+        });
+        final SimpleArraysMessageData out = new SimpleArraysMessageData();
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        assertEquals("Tried to allocate a collection of size 126, but there are only 7 bytes remaining.",
+                assertThrows(RuntimeException.class, () -> out.read(accessor, (short) 2)).getMessage());
+    }
+
+    @Test
+    public void testArrayBoundsCheckingOtherArray() {
+        // SimpleArraysMessageData takes 2 arrays
+        final ByteBuffer buf = ByteBuffer.wrap(new byte[] {
+            (byte) 0x01, // Set size of first array to 0
+            (byte) 0x7e, // Set size of second array to 125 which is larger than the size of this buffer
+            (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00, (byte) 0x00
+        });
+        final SimpleArraysMessageData out = new SimpleArraysMessageData();
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        assertEquals("Tried to allocate a collection of size 125, but there are only 6 bytes remaining.",
+                assertThrows(RuntimeException.class, () -> out.read(accessor, (short) 2)).getMessage());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
new file mode 100644
index 0000000000..6a0c6c2681
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ByteBufferAccessorTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ByteBufferAccessorTest {
+    @Test
+    public void testReadArray() {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        final byte[] testArray = new byte[] {0x4b, 0x61, 0x46};
+        accessor.writeByteArray(testArray);
+        accessor.writeInt(12345);
+        accessor.flip();
+        final byte[] testArray2 = accessor.readArray(3);
+        assertArrayEquals(testArray, testArray2);
+        assertEquals(12345, accessor.readInt());
+        assertEquals("Error reading byte array of 3 byte(s): only 0 byte(s) available",
+            assertThrows(RuntimeException.class,
+                () -> accessor.readArray(3)).getMessage());
+    }
+
+    @Test
+    public void testReadString() {
+        ByteBuffer buf = ByteBuffer.allocate(1024);
+        ByteBufferAccessor accessor = new ByteBufferAccessor(buf);
+        String testString = "ABC";
+        final byte[] testArray = testString.getBytes(StandardCharsets.UTF_8);
+        accessor.writeByteArray(testArray);
+        accessor.flip();
+        assertEquals("ABC", accessor.readString(3));
+        assertEquals("Error reading byte array of 2 byte(s): only 0 byte(s) available",
+                assertThrows(RuntimeException.class,
+                        () -> accessor.readString(2)).getMessage());
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 49743d2320..67212165fc 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -247,6 +247,20 @@ public class DefaultRecordTest {
         buf.flip();
         assertThrows(InvalidRecordException.class,
             () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
+
+        ByteBuffer buf2 = ByteBuffer.allocate(sizeOfBodyInBytes + ByteUtils.sizeOfVarint(sizeOfBodyInBytes));
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf2);
+        buf2.put(attributes);
+        ByteUtils.writeVarlong(timestampDelta, buf2);
+        ByteUtils.writeVarint(offsetDelta, buf2);
+        ByteUtils.writeVarint(-1, buf2); // null key
+        ByteUtils.writeVarint(-1, buf2); // null value
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf2); // more headers than remaining buffer size, not allowed
+        buf2.position(buf2.limit());
+
+        buf2.flip();
+        assertThrows(InvalidRecordException.class,
+                () -> DefaultRecord.readFrom(buf2, 0L, 0L, RecordBatch.NO_SEQUENCE, null));
     }
 
     @Test
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
index 4415ff960a..254dea0430 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -16,22 +16,31 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ProduceRequestData;
+import org.apache.kafka.common.message.SaslAuthenticateRequestData;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class RequestContextTest {
@@ -104,4 +113,78 @@ public class RequestContextTest {
         assertEquals(expectedResponse, parsedResponse.data());
     }
 
+    @Test
+    public void testInvalidRequestForImplicitHashCollection() throws UnknownHostException {
+        short version = (short) 5; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = produceRequest(version);
+        // corrupt the length of the topics array
+        corruptBuffer.putInt(8, (Integer.MAX_VALUE - 1) / 2);
+
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, version, "console-producer", 3);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals("Tried to allocate a collection of size 1073741823, but there are only 17 bytes remaining.", msg);
+    }
+
+    @Test
+    public void testInvalidRequestForArrayList() throws UnknownHostException {
+        short version = (short) 5; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = produceRequest(version);
+        // corrupt the length of the partitions array
+        corruptBuffer.putInt(17, Integer.MAX_VALUE);
+
+        RequestHeader header = new RequestHeader(ApiKeys.PRODUCE, version, "console-producer", 3);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals(
+                "Tried to allocate a collection of size 2147483647, but there are only 8 bytes remaining.", msg);
+    }
+
+    private ByteBuffer produceRequest(short version) {
+        ProduceRequestData data = new ProduceRequestData()
+                .setAcks((short) -1)
+                .setTimeoutMs(1);
+        data.topicData().add(
+                new ProduceRequestData.TopicProduceData()
+                        .setName("foo")
+                        .setPartitionData(Collections.singletonList(new ProduceRequestData.PartitionProduceData()
+                                .setIndex(42))));
+
+        return serialize(version, data);
+    }
+
+    private ByteBuffer serialize(short version, ApiMessage data) {
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        data.size(cache, version);
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        data.write(new ByteBufferAccessor(buffer), cache, version);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Test
+    public void testInvalidRequestForByteArray() throws UnknownHostException {
+        short version = (short) 1; // choose a version with fixed length encoding, for simplicity
+        ByteBuffer corruptBuffer = serialize(version, new SaslAuthenticateRequestData().setAuthBytes(new byte[0]));
+        // corrupt the length of the bytes array
+        corruptBuffer.putInt(0, Integer.MAX_VALUE);
+
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, version, "console-producer", 1);
+        RequestContext context = new RequestContext(header, "0", InetAddress.getLocalHost(),
+                KafkaPrincipal.ANONYMOUS, new ListenerName("ssl"), SecurityProtocol.SASL_SSL,
+                ClientInformation.EMPTY, true);
+
+        String msg = assertThrows(InvalidRequestException.class,
+                () -> context.parseRequest(corruptBuffer)).getCause().getMessage();
+        assertEquals("Error reading byte array of 2147483647 byte(s): only 0 byte(s) available", msg);
+    }
+
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 118a4244e2..679e5595df 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -184,6 +184,7 @@ import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.common.protocol.types.RawTaggedField;
 import org.apache.kafka.common.quota.ClientQuotaAlteration;
 import org.apache.kafka.common.quota.ClientQuotaEntity;
 import org.apache.kafka.common.quota.ClientQuotaFilter;
@@ -205,10 +206,12 @@ import org.apache.kafka.common.security.token.delegation.DelegationToken;
 import org.apache.kafka.common.security.token.delegation.TokenInformation;
 import org.apache.kafka.common.utils.SecurityUtils;
 import org.apache.kafka.common.utils.Utils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -234,6 +237,7 @@ import static org.apache.kafka.common.protocol.ApiKeys.LEADER_AND_ISR;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_GROUPS;
 import static org.apache.kafka.common.protocol.ApiKeys.LIST_OFFSETS;
 import static org.apache.kafka.common.protocol.ApiKeys.OFFSET_FETCH;
+import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
 import static org.apache.kafka.common.protocol.ApiKeys.STOP_REPLICA;
 import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
@@ -3047,4 +3051,92 @@ public class RequestResponseTest {
         return new ListTransactionsResponse(response);
     }
 
+    @Test
+    public void testInvalidSaslHandShakeRequest() {
+        AbstractRequest request = new SaslHandshakeRequest.Builder(
+                new SaslHandshakeRequestData().setMechanism("PLAIN")).build();
+        ByteBuffer serializedBytes = request.serialize();
+        // corrupt the length of the sasl mechanism string
+        serializedBytes.putShort(0, Short.MAX_VALUE);
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+            parseRequest(request.apiKey(), request.version(), serializedBytes)).getMessage();
+        assertEquals("Error reading byte array of 32767 byte(s): only 5 byte(s) available", msg);
+    }
+
+    @Test
+    public void testInvalidSaslAuthenticateRequest() {
+        short version = (short) 1; // choose a version with fixed length encoding, for simplicity
+        byte[] b = new byte[] {
+            0x11, 0x1f, 0x15, 0x2c,
+            0x5e, 0x2a, 0x20, 0x26,
+            0x6c, 0x39, 0x45, 0x1f,
+            0x25, 0x1c, 0x2d, 0x25,
+            0x43, 0x2a, 0x11, 0x76
+        };
+        SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(b);
+        AbstractRequest request = new SaslAuthenticateRequest(data, version);
+        ByteBuffer serializedBytes = request.serialize();
+
+        // corrupt the length of the bytes array
+        serializedBytes.putInt(0, Integer.MAX_VALUE);
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+                parseRequest(request.apiKey(), request.version(), serializedBytes)).getMessage();
+        assertEquals("Error reading byte array of 2147483647 byte(s): only 20 byte(s) available", msg);
+    }
+
+    @Test
+    public void testValidTaggedFieldsWithSaslAuthenticateRequest() {
+        byte[] byteArray = new byte[11];
+        ByteBufferAccessor accessor = new ByteBufferAccessor(ByteBuffer.wrap(byteArray));
+
+        //construct a SASL_AUTHENTICATE request
+        byte[] authBytes = "test".getBytes(StandardCharsets.UTF_8);
+        accessor.writeUnsignedVarint(authBytes.length + 1);
+        accessor.writeByteArray(authBytes);
+
+        //write total numbers of tags
+        accessor.writeUnsignedVarint(1);
+
+        //write first tag
+        RawTaggedField taggedField = new RawTaggedField(1, new byte[] {0x1, 0x2, 0x3});
+        accessor.writeUnsignedVarint(taggedField.tag());
+        accessor.writeUnsignedVarint(taggedField.size());
+        accessor.writeByteArray(taggedField.data());
+
+        accessor.flip();
+
+        SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest) AbstractRequest.
+                parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer()).request;
+        Assertions.assertArrayEquals(authBytes, saslAuthenticateRequest.data().authBytes());
+        assertEquals(1, saslAuthenticateRequest.data().unknownTaggedFields().size());
+        assertEquals(taggedField, saslAuthenticateRequest.data().unknownTaggedFields().get(0));
+    }
+
+    @Test
+    public void testInvalidTaggedFieldsWithSaslAuthenticateRequest() {
+        byte[] byteArray = new byte[13];
+        ByteBufferAccessor accessor = new ByteBufferAccessor(ByteBuffer.wrap(byteArray));
+
+        //construct a SASL_AUTHENTICATE request
+        byte[] authBytes = "test".getBytes(StandardCharsets.UTF_8);
+        accessor.writeUnsignedVarint(authBytes.length + 1);
+        accessor.writeByteArray(authBytes);
+
+        //write total numbers of tags
+        accessor.writeUnsignedVarint(1);
+
+        //write first tag
+        RawTaggedField taggedField = new RawTaggedField(1, new byte[] {0x1, 0x2, 0x3});
+        accessor.writeUnsignedVarint(taggedField.tag());
+        accessor.writeUnsignedVarint(Short.MAX_VALUE); // set wrong size for tagged field
+        accessor.writeByteArray(taggedField.data());
+
+        accessor.flip();
+
+        String msg = assertThrows(RuntimeException.class, () -> AbstractRequest.
+                parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage();
+        assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg);
+    }
 }
diff --git a/clients/src/test/resources/common/message/SimpleArraysMessage.json b/clients/src/test/resources/common/message/SimpleArraysMessage.json
new file mode 100644
index 0000000000..76dc283b6a
--- /dev/null
+++ b/clients/src/test/resources/common/message/SimpleArraysMessage.json
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+{
+  "name": "SimpleArraysMessage",
+  "type": "header",
+  "validVersions": "0-2",
+  "flexibleVersions": "1+",
+  "fields": [
+    { "name": "Goats", "type": "[]StructArray", "versions": "1+",
+      "fields": [
+        { "name": "Color", "type": "int8", "versions": "1+"},
+        { "name": "Name", "type": "string", "versions": "2+"}
+      ]
+    },
+    { "name": "Sheep", "type": "[]int32", "versions": "0+" }
+  ]
+}
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 509913885e..c067f945c5 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -299,11 +299,7 @@ object TestRaftServer extends Logging {
       out.writeByteArray(data)
     }
 
-    override def read(input: protocol.Readable, size: Int): Array[Byte] = {
-      val data = new Array[Byte](size)
-      input.readArray(data)
-      data
-    }
+    override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
   }
 
   private class LatencyHistogram(
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index e02529b33d..dcbe39d832 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -878,11 +878,7 @@ object KafkaMetadataLogTest {
     override def write(data: Array[Byte], serializationCache: ObjectSerializationCache, out: Writable): Unit = {
       out.writeByteArray(data)
     }
-    override def read(input: protocol.Readable, size: Int): Array[Byte] = {
-      val array = new Array[Byte](size)
-      input.readArray(array)
-      array
-    }
+    override def read(input: protocol.Readable, size: Int): Array[Byte] = input.readArray(size)
   }
 
   val DefaultMetadataLogConfig = MetadataLogConfig(
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index b9923ee572..dea3a03bce 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -610,8 +610,7 @@ public final class MessageDataGenerator implements MessageClassGenerator {
                 buffer.printf("%s_readable.readByteBuffer(%s)%s",
                     assignmentPrefix, lengthVar, assignmentSuffix);
             } else {
-                buffer.printf("byte[] newBytes = new byte[%s];%n", lengthVar);
-                buffer.printf("_readable.readArray(newBytes);%n");
+                buffer.printf("byte[] newBytes = _readable.readArray(%s);%n", lengthVar);
                 buffer.printf("%snewBytes%s", assignmentPrefix, assignmentSuffix);
             }
         } else if (type.isRecords()) {
@@ -619,6 +618,12 @@ public final class MessageDataGenerator implements MessageClassGenerator {
                 assignmentPrefix, lengthVar, assignmentSuffix);
         } else if (type.isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) type;
+            buffer.printf("if (%s > _readable.remaining()) {%n", lengthVar);
+            buffer.incrementIndent();
+            buffer.printf("throw new RuntimeException(\"Tried to allocate a collection of size \" + %s + \", but " +
+                    "there are only \" + _readable.remaining() + \" bytes remaining.\");%n", lengthVar);
+            buffer.decrementIndent();
+            buffer.printf("}%n");
             if (isStructArrayWithKeys) {
                 headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
                 buffer.printf("%s newCollection = new %s(%s);%n",
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index b36d4f1563..59bb436deb 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.raft.internals;
 
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.nio.ByteBuffer;
@@ -25,14 +26,16 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-import org.apache.kafka.common.protocol.DataInputStreamReadable;
-import org.apache.kafka.common.protocol.Readable;
+
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 
@@ -50,6 +53,13 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
     private int bytesRead = 0;
     private boolean isClosed = false;
 
+    /**
+     * This class provides an iterator over records retrieved via the raft client or from a snapshot
+     * @param records the records
+     * @param serde the serde to deserialize records
+     * @param bufferSupplier the buffer supplier implementation to allocate buffers when reading records. This must return ByteBuffer allocated on the heap
+     * @param batchSize the maximum batch size
+     */
     public RecordsIterator(
         Records records,
         RecordSerde<T> serde,
@@ -94,7 +104,7 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
 
     private void ensureOpen() {
         if (isClosed) {
-            throw new IllegalStateException("Serde record batch itererator was closed");
+            throw new IllegalStateException("Serde record batch iterator was closed");
         }
     }
 
@@ -196,11 +206,14 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
             }
 
             List<T> records = new ArrayList<>(numRecords);
-            try (DataInputStreamReadable input = new DataInputStreamReadable(batch.recordInputStream(bufferSupplier))) {
+            DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier));
+            try {
                 for (int i = 0; i < numRecords; i++) {
-                    T record = readRecord(input);
+                    T record = readRecord(input, batch.sizeInBytes());
                     records.add(record);
                 }
+            } finally {
+                Utils.closeQuietly(input, "DataInputStream");
             }
 
             result = Batch.data(
@@ -215,38 +228,74 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
         return result;
     }
 
-    private T readRecord(Readable input) {
+    private T readRecord(DataInputStream stream, int totalBatchSize) {
         // Read size of body in bytes
-        input.readVarint();
+        int size;
+        try {
+            size = ByteUtils.readVarint(stream);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Unable to read record size", e);
+        }
+        if (size <= 0) {
+            throw new RuntimeException("Invalid non-positive frame size: " + size);
+        }
+        if (size > totalBatchSize) {
+            throw new RuntimeException("Specified frame size, " + size + ", is larger than the entire size of the " +
+                    "batch, which is " + totalBatchSize);
+        }
+        ByteBuffer buf = bufferSupplier.get(size);
 
-        // Read unused attributes
-        input.readByte();
+        // The last byte of the buffer is reserved for a varint set to the number of record headers, which
+        // must be 0. Therefore, we set the ByteBuffer limit to size - 1.
+        buf.limit(size - 1);
 
-        long timestampDelta = input.readVarlong();
-        if (timestampDelta != 0) {
-            throw new IllegalArgumentException();
+        try {
+            int bytesRead = stream.read(buf.array(), 0, size);
+            if (bytesRead != size) {
+                throw new RuntimeException("Unable to read " + size + " bytes, only read " + bytesRead);
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to read record bytes", e);
         }
+        try {
+            ByteBufferAccessor input = new ByteBufferAccessor(buf);
 
-        // Read offset delta
-        input.readVarint();
+            // Read unused attributes
+            input.readByte();
 
-        int keySize = input.readVarint();
-        if (keySize != -1) {
-            throw new IllegalArgumentException("Unexpected key size " + keySize);
-        }
+            long timestampDelta = input.readVarlong();
+            if (timestampDelta != 0) {
+                throw new IllegalArgumentException("Got timestamp delta of " + timestampDelta + ", but this is invalid because it " +
+                        "is not 0 as expected.");
+            }
 
-        int valueSize = input.readVarint();
-        if (valueSize < 0) {
-            throw new IllegalArgumentException();
-        }
+            // Read offset delta
+            input.readVarint();
 
-        T record = serde.read(input, valueSize);
+            int keySize = input.readVarint();
+            if (keySize != -1) {
+                throw new IllegalArgumentException("Got key size of " + keySize + ", but this is invalid because it " +
+                        "is not -1 as expected.");
+            }
 
-        int numHeaders = input.readVarint();
-        if (numHeaders != 0) {
-            throw new IllegalArgumentException();
-        }
+            int valueSize = input.readVarint();
+            if (valueSize < 1) {
+                throw new IllegalArgumentException("Got payload size of " + valueSize + ", but this is invalid because " +
+                        "it is less than 1.");
+            }
+
+            // Read the metadata record body from the file input reader
+            T record = serde.read(input, valueSize);
 
-        return record;
+            // Read the number of headers. Currently, this must be a single byte set to 0.
+            int numHeaders = buf.array()[size - 1];
+            if (numHeaders != 0) {
+                throw new IllegalArgumentException("Got numHeaders of " + numHeaders + ", but this is invalid because " +
+                        "it is not 0 as expected.");
+            }
+            return record;
+        } finally {
+            bufferSupplier.release(buf);
+        }
     }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
index c2a011a687..15475be319 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/StringSerde.java
@@ -40,8 +40,7 @@ public class StringSerde implements RecordSerde<String> {
 
     @Override
     public String read(Readable input, int size) {
-        byte[] data = new byte[size];
-        input.readArray(data);
+        byte[] data = input.readArray(size);
         return Utils.utf8(data);
     }
 


[kafka] 03/06: MINOR: Update version to 3.0.2

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 17f695c4c2526c43893c566bb7d61a5103276b31
Author: Tom Bentley <tb...@redhat.com>
AuthorDate: Fri Sep 2 10:45:14 2022 +0100

    MINOR: Update version to 3.0.2
---
 docs/js/templateData.js                                                | 2 +-
 gradle.properties                                                      | 2 +-
 streams/quickstart/java/pom.xml                                        | 2 +-
 streams/quickstart/java/src/main/resources/archetype-resources/pom.xml | 2 +-
 streams/quickstart/pom.xml                                             | 2 +-
 tests/kafkatest/__init__.py                                            | 2 +-
 6 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/docs/js/templateData.js b/docs/js/templateData.js
index 05541483dd..2609b50d6a 100644
--- a/docs/js/templateData.js
+++ b/docs/js/templateData.js
@@ -19,6 +19,6 @@ limitations under the License.
 var context={
     "version": "30",
     "dotVersion": "3.0",
-    "fullDotVersion": "3.0.2-SNAPSHOT",
+    "fullDotVersion": "3.0.2",
     "scalaVersion": "2.13"
 };
diff --git a/gradle.properties b/gradle.properties
index fc4d1bf2c1..329399cac9 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -20,7 +20,7 @@ group=org.apache.kafka
 #  - tests/kafkatest/__init__.py
 #  - tests/kafkatest/version.py (variable DEV_VERSION)
 #  - kafka-merge-pr.py
-version=3.0.2-SNAPSHOT
+version=3.0.2
 scalaVersion=2.13.6
 task=build
 org.gradle.jvmargs=-Xmx2g -Xss4m -XX:+UseParallelGC
diff --git a/streams/quickstart/java/pom.xml b/streams/quickstart/java/pom.xml
index 6109a2be47..ce84f4f66a 100644
--- a/streams/quickstart/java/pom.xml
+++ b/streams/quickstart/java/pom.xml
@@ -26,7 +26,7 @@
     <parent>
         <groupId>org.apache.kafka</groupId>
         <artifactId>streams-quickstart</artifactId>
-        <version>3.0.2-SNAPSHOT</version>
+        <version>3.0.2</version>
         <relativePath>..</relativePath>
     </parent>
 
diff --git a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
index 9415c364a0..e55003ede4 100644
--- a/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
+++ b/streams/quickstart/java/src/main/resources/archetype-resources/pom.xml
@@ -29,7 +29,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <kafka.version>3.0.2-SNAPSHOT</kafka.version>
+        <kafka.version>3.0.2</kafka.version>
         <slf4j.version>1.7.7</slf4j.version>
         <log4j.version>1.2.17</log4j.version>
     </properties>
diff --git a/streams/quickstart/pom.xml b/streams/quickstart/pom.xml
index 2c640549f1..f481c79052 100644
--- a/streams/quickstart/pom.xml
+++ b/streams/quickstart/pom.xml
@@ -22,7 +22,7 @@
     <groupId>org.apache.kafka</groupId>
     <artifactId>streams-quickstart</artifactId>
     <packaging>pom</packaging>
-    <version>3.0.2-SNAPSHOT</version>
+    <version>3.0.2</version>
 
     <name>Kafka Streams :: Quickstart</name>
 
diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py
index 488e24d583..c8674298aa 100644
--- a/tests/kafkatest/__init__.py
+++ b/tests/kafkatest/__init__.py
@@ -22,4 +22,4 @@
 # Instead, in development branches, the version should have a suffix of the form ".devN"
 #
 # For example, when Kafka is at version 1.0.0-SNAPSHOT, this should be something like "1.0.0.dev0"
-__version__ = '3.0.2.dev0'
+__version__ = '3.0.2'


[kafka] 02/06: MINOR: Add configurable max receive size for SASL authentication requests

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 65a1e0451fc1012258582166732bba701331add1
Author: Manikumar Reddy <ma...@gmail.com>
AuthorDate: Mon May 16 19:25:02 2022 +0530

    MINOR: Add configurable max receive size for SASL authentication requests
    
    This adds a new configuration `sasl.server.max.receive.size` that sets the maximum receive size for requests before and during authentication.
    
    Reviewers: Tom Bentley <tb...@redhat.com>, Mickael Maison <mi...@gmail.com>
    
    Co-authored-by: Manikumar Reddy <ma...@gmail.com>
    Co-authored-by: Mickael Maison <mi...@gmail.com>
---
 checkstyle/suppressions.xml                        |  2 +
 .../config/internals/BrokerSecurityConfigs.java    |  6 +++
 .../authenticator/SaslServerAuthenticator.java     | 16 ++++++--
 .../kafka/common/security/TestSecurityConfig.java  |  2 +
 .../authenticator/SaslAuthenticatorTest.java       | 46 ++++++++++++++++++++++
 .../authenticator/SaslServerAuthenticatorTest.java |  6 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  4 ++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  2 +
 8 files changed, 77 insertions(+), 7 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 69c9374f5b..d47545432a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -32,6 +32,8 @@
               files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
     <suppress checks="ClassFanOutComplexity"
               files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
+    <suppress checks="NPath"
+              files="SaslServerAuthenticator.java"/>
     <suppress checks="ClassFanOutComplexity"
               files="Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
diff --git a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index 0b90da8f80..8b7a9649c2 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -36,6 +36,8 @@ public class BrokerSecurityConfigs {
     public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = "sasl.server.callback.handler.class";
     public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = "ssl.principal.mapping.rules";
     public static final String CONNECTIONS_MAX_REAUTH_MS = "connections.max.reauth.ms";
+    public static final int DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE = 524288;
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG = "sasl.server.max.receive.size";
 
     public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the " +
             "KafkaPrincipalBuilder interface, which is used to build the KafkaPrincipal object used during " +
@@ -89,4 +91,8 @@ public class BrokerSecurityConfigs {
             + "The broker will disconnect any such connection that is not re-authenticated within the session lifetime and that is then subsequently "
             + "used for any purpose other than re-authentication. Configuration names can optionally be prefixed with listener prefix and SASL "
             + "mechanism name in lower-case. For example, listener.name.sasl_ssl.oauthbearer.connections.max.reauth.ms=3600000";
+
+    public static final String SASL_SERVER_MAX_RECEIVE_SIZE_DOC = "The maximum receive size allowed before and during initial SASL authentication." +
+            " Default receive size is 512KB. GSSAPI limits requests to 64K, but we allow upto 512KB by default for custom SASL mechanisms. In practice," +
+            " PLAIN, SCRAM and OAUTH mechanisms can use much smaller limits.";
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 6e35ee7a90..a48d7472dc 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
+import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.ByteBufferSend;
 import org.apache.kafka.common.network.ChannelBuilders;
@@ -88,8 +89,6 @@ import java.util.Optional;
 import java.util.function.Supplier;
 
 public class SaslServerAuthenticator implements Authenticator {
-    // GSSAPI limits requests to 64K, but we allow a bit extra for custom SASL mechanisms
-    static final int MAX_RECEIVE_SIZE = 524288;
     private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
 
     /**
@@ -140,6 +139,7 @@ public class SaslServerAuthenticator implements Authenticator {
     private String saslMechanism;
 
     // buffers used in `authenticate`
+    private Integer saslAuthRequestMaxReceiveSize;
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
     private Send authenticationFailureSend = null;
@@ -189,6 +189,10 @@ public class SaslServerAuthenticator implements Authenticator {
         // Note that the old principal builder does not support SASL, so we do not need to pass the
         // authenticator or the transport layer
         this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, kerberosNameParser, null);
+
+        saslAuthRequestMaxReceiveSize = (Integer) configs.get(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG);
+        if (saslAuthRequestMaxReceiveSize == null)
+            saslAuthRequestMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE;
     }
 
     private void createSaslServer(String mechanism) throws IOException {
@@ -252,9 +256,13 @@ public class SaslServerAuthenticator implements Authenticator {
             }
 
             // allocate on heap (as opposed to any socket server memory pool)
-            if (netInBuffer == null) netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
+            if (netInBuffer == null) netInBuffer = new NetworkReceive(saslAuthRequestMaxReceiveSize, connectionId);
 
-            netInBuffer.readFrom(transportLayer);
+            try {
+                netInBuffer.readFrom(transportLayer);
+            } catch (InvalidReceiveException e) {
+                throw new SaslAuthenticationException("Failing SASL authentication due to invalid receive size", e);
+            }
             if (!netInBuffer.complete())
                 return;
             netInBuffer.payload().rewind();
diff --git a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
index 07cbb7856d..197151f5fb 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/TestSecurityConfig.java
@@ -38,6 +38,8 @@ public class TestSecurityConfig extends AbstractConfig {
                     null, Importance.MEDIUM, BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
             .define(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, Type.LONG, 0L, Importance.MEDIUM,
                     BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC)
+            .define(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, Type.INT, BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE,
+                    Importance.LOW, BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC)
             .withClientSslSupport()
             .withClientSaslSupport();
 
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 988a0f2823..40a27935f3 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -212,6 +212,52 @@ public class SaslAuthenticatorTest {
         checkAuthenticationAndReauthentication(securityProtocol, node);
     }
 
+    /**
+     * Test SASL/PLAIN with sasl.authentication.max.receive.size config
+     */
+    @Test
+    public void testSaslAuthenticationMaxReceiveSize() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
+        configureMechanisms("PLAIN", Collections.singletonList("PLAIN"));
+
+        // test auth with 1KB receive size
+        saslServerConfigs.put(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG, "1024");
+        server = createEchoServer(securityProtocol);
+
+        // test valid sasl authentication
+        String node1 = "valid";
+        checkAuthenticationAndReauthentication(securityProtocol, node1);
+
+        // test with handshake request with large mechanism string
+        byte[] bytes = new byte[1024];
+        new Random().nextBytes(bytes);
+        String mechanism = new String(bytes, StandardCharsets.UTF_8);
+        String node2 = "invalid1";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node2);
+        SaslHandshakeRequest handshakeRequest = buildSaslHandshakeRequest(mechanism, ApiKeys.SASL_HANDSHAKE.latestVersion());
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, handshakeRequest.version(), "someclient", nextCorrelationId++);
+        NetworkSend send = new NetworkSend(node2, handshakeRequest.toSend(header));
+        selector.send(send);
+        //we will get exception in server and connection gets closed.
+        NetworkTestUtils.waitForChannelClose(selector, node2, ChannelState.READY.state());
+        selector.close();
+
+        String node3 = "invalid2";
+        createClientConnection(SecurityProtocol.PLAINTEXT, node3);
+        sendHandshakeRequestReceiveResponse(node3, ApiKeys.SASL_HANDSHAKE.latestVersion());
+
+        // test with sasl authenticate request with large auth_byes string
+        String authString = "\u0000" + TestJaasConfig.USERNAME + "\u0000" +  new String(bytes, StandardCharsets.UTF_8);
+        ByteBuffer authBuf = ByteBuffer.wrap(Utils.utf8(authString));
+        SaslAuthenticateRequestData data = new SaslAuthenticateRequestData().setAuthBytes(authBuf.array());
+        SaslAuthenticateRequest request = new SaslAuthenticateRequest.Builder(data).build();
+        header = new RequestHeader(ApiKeys.SASL_AUTHENTICATE, request.version(), "someclient", nextCorrelationId++);
+        send = new NetworkSend(node3, request.toSend(header));
+        selector.send(send);
+        NetworkTestUtils.waitForChannelClose(selector, node3, ChannelState.READY.state());
+        server.verifyAuthenticationMetrics(1, 2);
+    }
+
     /**
      * Tests that SASL/PLAIN clients with invalid password fail authentication.
      */
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index af0fedd4f5..8245f57516 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -19,11 +19,11 @@ package org.apache.kafka.common.security.authenticator;
 import java.net.InetAddress;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.message.ApiMessageType;
 import org.apache.kafka.common.network.ChannelMetadataRegistry;
 import org.apache.kafka.common.network.ClientInformation;
 import org.apache.kafka.common.network.DefaultChannelMetadataRegistry;
-import org.apache.kafka.common.network.InvalidReceiveException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -68,10 +68,10 @@ public class SaslServerAuthenticatorTest {
             SCRAM_SHA_256.mechanismName(), new DefaultChannelMetadataRegistry());
 
         when(transportLayer.read(any(ByteBuffer.class))).then(invocation -> {
-            invocation.<ByteBuffer>getArgument(0).putInt(SaslServerAuthenticator.MAX_RECEIVE_SIZE + 1);
+            invocation.<ByteBuffer>getArgument(0).putInt(BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE + 1);
             return 4;
         });
-        assertThrows(InvalidReceiveException.class, authenticator::authenticate);
+        assertThrows(SaslAuthenticationException.class, authenticator::authenticate);
         verify(transportLayer).read(any(ByteBuffer.class));
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index c556d7ab81..35d6f5b90a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -253,6 +253,7 @@ object Defaults {
 
     /** ********* General Security configuration ***********/
   val ConnectionsMaxReauthMsDefault = 0L
+  val DefaultServerMaxMaxReceiveSize = BrokerSecurityConfigs.DEFAULT_SASL_SERVER_MAX_RECEIVE_SIZE
   val DefaultPrincipalSerde = classOf[DefaultKafkaPrincipalBuilder]
 
   /** ********* Sasl configuration ***********/
@@ -549,6 +550,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassProp = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
   val ConnectionsMaxReauthMsProp = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS
+  val SaslServerMaxReceiveSizeProp = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG
   val securityProviderClassProp = SecurityConfig.SECURITY_PROVIDERS_CONFIG
 
   /** ********* SSL Configuration ****************/
@@ -960,6 +962,7 @@ object KafkaConfig {
   /** ******** Common Security Configuration *************/
   val PrincipalBuilderClassDoc = BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_DOC
   val ConnectionsMaxReauthMsDoc = BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_DOC
+  val SaslServerMaxReceiveSizeDoc = BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_DOC
   val securityProviderClassDoc = SecurityConfig.SECURITY_PROVIDERS_DOC
 
   /** ********* SSL Configuration ****************/
@@ -1249,6 +1252,7 @@ object KafkaConfig {
 
       /** ********* General Security Configuration ****************/
       .define(ConnectionsMaxReauthMsProp, LONG, Defaults.ConnectionsMaxReauthMsDefault, MEDIUM, ConnectionsMaxReauthMsDoc)
+      .define(SaslServerMaxReceiveSizeProp, INT, Defaults.DefaultServerMaxMaxReceiveSize, MEDIUM, SaslServerMaxReceiveSizeDoc)
       .define(securityProviderClassProp, STRING, null, LOW, securityProviderClassDoc)
 
       /** ********* SSL Configuration ****************/
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 2e38df00f1..a736bf4357 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -791,6 +791,8 @@ class KafkaConfigTest {
         case KafkaConfig.KafkaMetricsReporterClassesProp => // ignore
         case KafkaConfig.KafkaMetricsPollingIntervalSecondsProp => //ignore
 
+        case KafkaConfig.SaslServerMaxReceiveSizeProp => assertPropertyInvalid(baseProperties, name, "not_a_number")
+
         // Raft Quorum Configs
         case RaftConfig.QUORUM_VOTERS_CONFIG => // ignore string
         case RaftConfig.QUORUM_ELECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")


[kafka] 04/06: MINOR: Update docs/upgrade.html

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 110a640f76fcf27625913ee0764be0d32f4dbb98
Author: Tom Bentley <tb...@redhat.com>
AuthorDate: Fri Sep 2 11:08:24 2022 +0100

    MINOR: Update docs/upgrade.html
---
 docs/upgrade.html | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 55 insertions(+)

diff --git a/docs/upgrade.html b/docs/upgrade.html
index b13ae76252..b14f01dd1d 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,61 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_3_0_2" href="#upgrade_3_0_2">Upgrading to 3.0.2 from any version 0.8.x through 2.8.x</a></h4>
+
+<p><b>If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets.
+    Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1.</b></p>
+
+<p><b>For a rolling upgrade:</b></p>
+
+<ol>
+    <li> Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you
+        are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously
+        overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior
+        to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION.
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g., <code>2.6</code>, <code>2.5</code>, etc.)</li>
+            <li>log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  (See <a href="#upgrade_10_performance_impact">potential performance impact
+                following the upgrade</a> for the details on what this configuration does.)</li>
+        </ul>
+        If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override
+        the inter-broker protocol version.
+        <ul>
+            <li>inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g., <code>2.6</code>, <code>2.5</code>, etc.)</li>
+        </ul>
+    </li>
+    <li> Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the
+        brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations.
+        It is still possible to downgrade at this point if there are any problems.
+    </li>
+    <li> Once the cluster's behavior and performance has been verified, bump the protocol version by editing
+        <code>inter.broker.protocol.version</code> and setting it to <code>3.0</code>.
+    </li>
+    <li> Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest
+        protocol version, it will no longer be possible to downgrade the cluster to an older version.
+    </li>
+    <li> If you have overridden the message format version as instructed above, then you need to do one more rolling restart to
+        upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later,
+        change log.message.format.version to 3.0 on each broker and restart them one by one. Note that the older Scala clients,
+        which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs
+        (or to take advantage of <a href="#upgrade_11_exactly_once_semantics">exactly once semantics</a>),
+        the newer Java clients must be used.
+    </li>
+</ol>
+
+<h5><a id="upgrade_302_notable" href="#upgrade_302_notable">Notable changes in 3.0.2</a></h5>
+<ul>
+    <li>A notable exception to producer idempotence being enabled by default in 3.0.1 is Connect.
+        In 3.0.2 Connect by default disables idempotent behavior for all of its
+        producers in order to uniformly support using a wide range of Kafka broker versions.
+        Users can change this behavior to enable idempotence for some or all producers
+        via Connect worker and/or connector configuration. Connect may enable idempotent producers
+        by default in a future major release.</li>
+    <li>The example connectors, <code>FileStreamSourceConnector</code> and <code>FileStreamSinkConnector</code>, have been
+        removed from the default classpath. To use them in Kafka Connect standalone or distributed mode they need to be
+        explicitly added, for example <code>CLASSPATH=./lib/connect-file-3.2.0.jar ./bin/connect-distributed.sh</code>.</li>
+</ul>
+
 <h5><a id="upgrade_301_notable" href="#upgrade_301_notable">Notable changes in 3.0.1</a></h5>
 <ul>
     <li>Idempotence for the producer is enabled by default if no conflicting configurations are set. When producing to brokers older than 2.8.0,


[kafka] 05/06: MINOR: Update LICENSE-binary

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tombentley pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 286eceea7de742a35a4f0a6a979c704561e2039d
Author: Tom Bentley <tb...@redhat.com>
AuthorDate: Fri Sep 2 12:21:43 2022 +0100

    MINOR: Update LICENSE-binary
---
 LICENSE-binary | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 057b88f0b2..34c456ed0b 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -219,16 +219,16 @@ jackson-module-jaxb-annotations-2.12.6
 jackson-module-scala_2.13-2.12.6
 jakarta.validation-api-2.0.2
 javassist-3.27.0-GA
-jetty-client-9.4.44.v20210927
-jetty-continuation-9.4.44.v20210927
-jetty-http-9.4.44.v20210927
-jetty-io-9.4.44.v20210927
-jetty-security-9.4.44.v20210927
-jetty-server-9.4.44.v20210927
-jetty-servlet-9.4.44.v20210927
-jetty-servlets-9.4.44.v20210927
-jetty-util-9.4.44.v20210927
-jetty-util-ajax-9.4.44.v20210927
+jetty-client-9.4.48.v20220622
+jetty-continuation-9.4.48.v20220622
+jetty-http-9.4.48.v20220622
+jetty-io-9.4.48.v20220622
+jetty-security-9.4.48.v20220622
+jetty-server-9.4.48.v20220622
+jetty-servlet-9.4.48.v20220622
+jetty-servlets-9.4.48.v20220622
+jetty-util-9.4.48.v20220622
+jetty-util-ajax-9.4.48.v20220622
 jersey-common-2.34
 jersey-server-2.34
 log4j-1.2.17