You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2019/05/09 18:08:45 UTC

[kafka] branch trunk updated: KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a97e55b  KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala
a97e55b is described below

commit a97e55b83868ff786e740db55e73116f85456dcb
Author: Bob Barrett <bo...@outlook.com>
AuthorDate: Thu May 9 11:08:22 2019 -0700

    KAFKA-8332: Refactor ImplicitLinkedHashSet to avoid losing ordering when converting to Scala
    
    Because of how conversions between Java collections and Scala collections work, ImplicitLinkedHashMultiSet objects were being treated as unordered in some contexts where they shouldn't be.  This broke JOIN_GROUP handling.
    
    This patch renames ImplicitLinkedHashMultiSet to ImplicitLinkedHashMultCollection.  The order of Collection objects will be preserved when converting to scala.  Adding Set and List "views" to the Collection gives us a more elegant way of accessing that functionality when needed.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |   8 +-
 .../consumer/internals/AbstractCoordinator.java    |   2 +-
 .../consumer/internals/ConsumerCoordinator.java    |   4 +-
 .../requests/ControlledShutdownResponse.java       |   3 +-
 ...hSet.java => ImplicitLinkedHashCollection.java} | 218 ++++++++-
 ...java => ImplicitLinkedHashMultiCollection.java} |  12 +-
 .../internals/AbstractCoordinatorTest.java         |   4 +-
 .../apache/kafka/common/message/MessageTest.java   |   7 +-
 .../kafka/common/requests/RequestResponseTest.java |   9 +-
 .../utils/ImplicitLinkedHashCollectionTest.java    | 536 +++++++++++++++++++++
 ... => ImplicitLinkedHashMultiCollectionTest.java} |  16 +-
 .../common/utils/ImplicitLinkedHashSetTest.java    | 245 ----------
 .../runtime/distributed/WorkerCoordinator.java     |   4 +-
 .../runtime/distributed/WorkerCoordinatorTest.java |   2 +-
 .../src/main/scala/kafka/server/FetchSession.scala |  10 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  14 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  11 +-
 .../server/AbstractCreateTopicsRequestTest.scala   |   6 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  37 ++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 .../apache/kafka/message/MessageDataGenerator.java |  34 +-
 .../org/apache/kafka/message/MessageGenerator.java |   4 +-
 22 files changed, 853 insertions(+), 341 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index a0958e9..8870519 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -61,7 +61,7 @@ import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
 import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
 import org.apache.kafka.common.message.DeleteTopicsRequestData;
 import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@@ -71,7 +71,7 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
 import org.apache.kafka.common.message.MetadataRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
 import org.apache.kafka.common.metrics.JmxReporter;
@@ -1280,7 +1280,7 @@ public class KafkaAdminClient extends AdminClient {
     public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                                            final CreateTopicsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size());
-        final CreatableTopicSet topics = new CreatableTopicSet();
+        final CreatableTopicCollection topics = new CreatableTopicCollection();
         for (NewTopic newTopic : newTopics) {
             if (topicNameIsUnrepresentable(newTopic.name())) {
                 KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
@@ -2025,7 +2025,7 @@ public class KafkaAdminClient extends AdminClient {
         IncrementalAlterConfigsRequestData requestData = new IncrementalAlterConfigsRequestData();
         requestData.setValidateOnly(validateOnly);
         for (ConfigResource resource : resources) {
-            AlterableConfigSet alterableConfigSet = new AlterableConfigSet();
+            AlterableConfigCollection alterableConfigSet = new AlterableConfigCollection();
             for (AlterConfigOp configEntry : configs.get(resource))
                 alterableConfigSet.add(new AlterableConfig().
                         setName(configEntry.configEntry().name()).
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 69d4928..ebe8231 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -188,7 +188,7 @@ public abstract class AbstractCoordinator implements Closeable {
      * on the preference).
      * @return Non-empty map of supported protocols and metadata
      */
-    protected abstract JoinGroupRequestData.JoinGroupRequestProtocolSet metadata();
+    protected abstract JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata();
 
     /**
      * Invoked prior to each group join or rejoin. This is typically used to perform any
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 829d9dc..95cff78 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -171,10 +171,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
     @Override
-    protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
+    protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
         log.debug("Joining group with current subscription: {}", subscriptions.subscription());
         this.joinedSubscription = subscriptions.subscription();
-        JoinGroupRequestData.JoinGroupRequestProtocolSet protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolSet();
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolCollection();
 
         for (PartitionAssignor assignor : assignors) {
             Subscription subscription = assignor.subscription(joinedSubscription);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index 5448f03..a49b84a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
-import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -74,7 +73,7 @@ public class ControlledShutdownResponse extends AbstractResponse {
     public static ControlledShutdownResponse prepareResponse(Errors error, Set<TopicPartition> tps) {
         ControlledShutdownResponseData data = new ControlledShutdownResponseData();
         data.setErrorCode(error.code());
-        ControlledShutdownResponseData.RemainingPartitionSet pSet = new RemainingPartitionSet();
+        ControlledShutdownResponseData.RemainingPartitionCollection pSet = new ControlledShutdownResponseData.RemainingPartitionCollection();
         tps.forEach(tp -> {
             pSet.add(new RemainingPartition()
                     .setTopicName(tp.topic())
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
similarity index 66%
rename from clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java
rename to clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
index 75fc9ee..e060629 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashSet.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollection.java
@@ -17,9 +17,14 @@
 
 package org.apache.kafka.common.utils;
 
+import java.util.AbstractCollection;
+import java.util.AbstractSequentialList;
 import java.util.AbstractSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 /**
  * A memory-efficient hash set which tracks the order of insertion of elements.
@@ -40,7 +45,7 @@ import java.util.NoSuchElementException;
  *
  * This set does not allow null elements.  It does not have internal synchronization.
  */
-public class ImplicitLinkedHashSet<E extends ImplicitLinkedHashSet.Element> extends AbstractSet<E> {
+public class ImplicitLinkedHashCollection<E extends ImplicitLinkedHashCollection.Element> extends AbstractCollection<E> {
     public interface Element {
         int prev();
         void setPrev(int prev);
@@ -127,35 +132,137 @@ public class ImplicitLinkedHashSet<E extends ImplicitLinkedHashSet.Element> exte
         element.setPrev(INVALID_INDEX);
     }
 
-    private class ImplicitLinkedHashSetIterator implements Iterator<E> {
+    private class ImplicitLinkedHashCollectionIterator implements ListIterator<E> {
+        private int cursor = 0;
         private Element cur = head;
+        private int lastReturnedSlot = INVALID_INDEX;
 
-        private Element next = indexToElement(head, elements, head.next());
+        ImplicitLinkedHashCollectionIterator(int index) {
+            for (int i = 0; i < index; ++i) {
+                cur = indexToElement(head, elements, cur.next());
+                cursor++;
+            }
+        }
 
         @Override
         public boolean hasNext() {
-            return next != head;
+            return cursor != size;
+        }
+
+        @Override
+        public boolean hasPrevious() {
+            return cursor != 0;
         }
 
         @Override
         public E next() {
-            if (next == head) {
+            if (cursor == size) {
+                throw new NoSuchElementException();
+            }
+            lastReturnedSlot = cur.next();
+            cur = indexToElement(head, elements, cur.next());
+            ++cursor;
+            @SuppressWarnings("unchecked")
+            E returnValue = (E) cur;
+            return returnValue;
+        }
+
+        @Override
+        public E previous() {
+            if (cursor == 0) {
                 throw new NoSuchElementException();
             }
-            cur = next;
-            next = indexToElement(head, elements, cur.next());
             @SuppressWarnings("unchecked")
             E returnValue = (E) cur;
+            cur = indexToElement(head, elements, cur.prev());
+            lastReturnedSlot = cur.next();
+            --cursor;
             return returnValue;
         }
 
         @Override
+        public int nextIndex() {
+            return cursor;
+        }
+
+        @Override
+        public int previousIndex() {
+            return cursor - 1;
+        }
+
+        @Override
         public void remove() {
-            if (cur == head) {
+            if (lastReturnedSlot == INVALID_INDEX) {
                 throw new IllegalStateException();
             }
-            ImplicitLinkedHashSet.this.remove(cur);
-            cur = head;
+
+            if (cur == indexToElement(head, elements, lastReturnedSlot)) {
+                cursor--;
+                cur = indexToElement(head, elements, cur.prev());
+            }
+            ImplicitLinkedHashCollection.this.removeElementAtSlot(lastReturnedSlot);
+
+            lastReturnedSlot = INVALID_INDEX;
+        }
+
+        @Override
+        public void set(E e) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void add(E e) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private class ImplicitLinkedHashCollectionListView extends AbstractSequentialList<E> {
+
+        @Override
+        public ListIterator<E> listIterator(int index) {
+            if (index < 0 || index > size) {
+                throw new IndexOutOfBoundsException();
+            }
+
+            return ImplicitLinkedHashCollection.this.listIterator(index);
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+    }
+
+    private class ImplicitLinkedHashCollectionSetView extends AbstractSet<E> {
+
+        @Override
+        public Iterator<E> iterator() {
+            return ImplicitLinkedHashCollection.this.iterator();
+        }
+
+        @Override
+        public int size() {
+            return size;
+        }
+
+        @Override
+        public boolean add(E newElement) {
+            return ImplicitLinkedHashCollection.this.add(newElement);
+        }
+
+        @Override
+        public boolean remove(Object key) {
+            return ImplicitLinkedHashCollection.this.remove(key);
+        }
+
+        @Override
+        public boolean contains(Object key) {
+            return ImplicitLinkedHashCollection.this.contains(key);
+        }
+
+        @Override
+        public void clear() {
+            ImplicitLinkedHashCollection.this.clear();
         }
     }
 
@@ -174,7 +281,11 @@ public class ImplicitLinkedHashSet<E extends ImplicitLinkedHashSet.Element> exte
      */
     @Override
     final public Iterator<E> iterator() {
-        return new ImplicitLinkedHashSetIterator();
+        return listIterator(0);
+    }
+
+    private ListIterator<E> listIterator(int index) {
+        return new ImplicitLinkedHashCollectionIterator(index);
     }
 
     final int slot(Element[] curElements, Object e) {
@@ -402,30 +513,30 @@ public class ImplicitLinkedHashSet<E extends ImplicitLinkedHashSet.Element> exte
     }
 
     /**
-     * Create a new ImplicitLinkedHashSet.
+     * Create a new ImplicitLinkedHashCollection.
      */
-    public ImplicitLinkedHashSet() {
+    public ImplicitLinkedHashCollection() {
         this(0);
     }
 
     /**
-     * Create a new ImplicitLinkedHashSet.
+     * Create a new ImplicitLinkedHashCollection.
      *
      * @param expectedNumElements   The number of elements we expect to have in this set.
      *                              This is used to optimize by setting the capacity ahead
      *                              of time rather than growing incrementally.
      */
-    public ImplicitLinkedHashSet(int expectedNumElements) {
+    public ImplicitLinkedHashCollection(int expectedNumElements) {
         clear(expectedNumElements);
     }
 
     /**
-     * Create a new ImplicitLinkedHashSet.
+     * Create a new ImplicitLinkedHashCollection.
      *
      * @param iter                  We will add all the elements accessible through this iterator
      *                              to the set.
      */
-    public ImplicitLinkedHashSet(Iterator<E> iter) {
+    public ImplicitLinkedHashCollection(Iterator<E> iter) {
         clear(0);
         while (iter.hasNext()) {
             mustAdd(iter.next());
@@ -457,8 +568,81 @@ public class ImplicitLinkedHashSet<E extends ImplicitLinkedHashSet.Element> exte
         }
     }
 
+    /**
+     * Compares the specified object with this collection for equality. Two
+     * {@code ImplicitLinkedHashCollection} objects are equal if they contain the
+     * same elements (as determined by the element's {@code equals} method), and
+     * those elements were inserted in the same order. Because
+     * {@code ImplicitLinkedHashCollectionListIterator} iterates over the elements
+     * in insertion order, it is sufficient to call {@code valuesList.equals}.
+     *
+     * Note that {@link ImplicitLinkedHashMultiCollection} does not override
+     * {@code equals} and uses this method as well. This means that two
+     * {@code ImplicitLinkedHashMultiCollection} objects will be considered equal even
+     * if they each contain two elements A and B such that A.equals(B) but A != B and
+     * A and B have switched insertion positions between the two collections. This
+     * is an acceptable definition of equality, because the collections are still
+     * equal in terms of the order and value of each element.
+     *
+     * @param o object to be compared for equality with this collection
+     * @return true is the specified object is equal to this collection
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (o == this)
+            return true;
+
+        if (!(o instanceof ImplicitLinkedHashCollection))
+            return false;
+
+        ImplicitLinkedHashCollection<?> ilhs = (ImplicitLinkedHashCollection<?>) o;
+        return this.valuesList().equals(ilhs.valuesList());
+    }
+
+    /**
+     * Returns the hash code value for this collection. Because
+     * {@code ImplicitLinkedHashCollection.equals} compares the {@code valuesList}
+     * of two {@code ImplicitLinkedHashCollection} objects to determine equality,
+     * this method uses the @{code valuesList} to compute the has code value as well.
+     *
+     * @return the hash code value for this collection
+     */
+    @Override
+    public int hashCode() {
+        return this.valuesList().hashCode();
+    }
+
     // Visible for testing
     final int numSlots() {
         return elements.length;
     }
+
+    /**
+     * Returns a {@link List} view of the elements contained in the collection,
+     * ordered by order of insertion into the collection. The list is backed by the
+     * collection, so changes to the collection are reflected in the list and
+     * vice-versa. The list supports element removal, which removes the corresponding
+     * element from the collection, but does not support the {@code add} or
+     * {@code set} operations.
+     *
+     * The list is implemented as a circular linked list, so all index-based
+     * operations, such as {@code List.get}, run in O(n) time.
+     *
+     * @return a list view of the elements contained in this collection
+     */
+    public List<E> valuesList() {
+        return new ImplicitLinkedHashCollectionListView();
+    }
+
+    /**
+     * Returns a {@link Set} view of the elements contained in the collection. The
+     * set is backed by the collection, so changes to the collection are reflected in
+     * the set, and vice versa. The set supports element removal and addition, which
+     * removes from or adds to the collection, respectively.
+     *
+     * @return a set view of the elements contained in this collection
+     */
+    public Set<E> valuesSet() {
+        return new ImplicitLinkedHashCollectionSetView();
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java
similarity index 91%
rename from clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java
rename to clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java
index 2eb53f6..85714d6 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSet.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollection.java
@@ -24,7 +24,7 @@ import java.util.List;
 
 /**
  * A memory-efficient hash multiset which tracks the order of insertion of elements.
- * See org.apache.kafka.common.utils.ImplicitLinkedHashSet for implementation details.
+ * See org.apache.kafka.common.utils.ImplicitLinkedHashCollection for implementation details.
  *
  * This class is a multi-set because it allows multiple elements to be inserted that are
  * equal to each other.
@@ -42,17 +42,17 @@ import java.util.List;
  *
  * This multiset does not allow null elements.  It does not have internal synchronization.
  */
-public class ImplicitLinkedHashMultiSet<E extends ImplicitLinkedHashSet.Element>
-        extends ImplicitLinkedHashSet<E> {
-    public ImplicitLinkedHashMultiSet() {
+public class ImplicitLinkedHashMultiCollection<E extends ImplicitLinkedHashCollection.Element>
+        extends ImplicitLinkedHashCollection<E> {
+    public ImplicitLinkedHashMultiCollection() {
         super(0);
     }
 
-    public ImplicitLinkedHashMultiSet(int expectedNumElements) {
+    public ImplicitLinkedHashMultiCollection(int expectedNumElements) {
         super(expectedNumElements);
     }
 
-    public ImplicitLinkedHashMultiSet(Iterator<E> iter) {
+    public ImplicitLinkedHashMultiCollection(Iterator<E> iter) {
         super(iter);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 4ce0386..8d8afd2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -818,8 +818,8 @@ public class AbstractCoordinatorTest {
         }
 
         @Override
-        protected JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
-            return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+        protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
+            return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
                     Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
                             .setName("dummy-subprotocol")
                             .setMetadata(EMPTY_DATA.array())).iterator()
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index fdb538e..1780a71 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.common.message;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Message;
@@ -36,7 +35,7 @@ import java.util.List;
 import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
-import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicSet;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -71,12 +70,12 @@ public final class MessageTest {
             setTransactionalId("blah").
             setProducerId(0xbadcafebadcafeL).
             setProducerEpoch((short) 30000).
-            setTopics(new AddPartitionsToTxnTopicSet(Collections.singletonList(
+            setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
                 new AddPartitionsToTxnTopic().
                     setName("Topic").
                     setPartitions(Collections.singletonList(1))).iterator())));
         testMessageRoundTrips(new CreateTopicsRequestData().
-            setTimeoutMs(1000).setTopics(new CreatableTopicSet()));
+            setTimeoutMs(1000).setTopics(new CreateTopicsRequestData.CreatableTopicCollection()));
         testMessageRoundTrips(new DescribeAclsRequestData().
             setResourceType((byte) 42).
             setResourceNameFilter(null).
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ce7d1d3..43057a6 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -35,7 +35,7 @@ import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.ControlledShutdownRequestData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData;
 import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
-import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionSet;
+import org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
 import org.apache.kafka.common.message.CreateTopicsRequestData;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
@@ -67,7 +67,6 @@ import org.apache.kafka.common.message.SaslHandshakeRequestData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigSet;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResult;
@@ -781,7 +780,7 @@ public class RequestResponseTest {
     }
 
     private JoinGroupRequest createJoinGroupRequest(int version) {
-        JoinGroupRequestData.JoinGroupRequestProtocolSet protocols = new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
                 Collections.singleton(
                 new JoinGroupRequestData.JoinGroupRequestProtocol()
                         .setName("consumer-range")
@@ -1053,7 +1052,7 @@ public class RequestResponseTest {
         RemainingPartition p2 = new RemainingPartition()
                 .setTopicName("test1")
                 .setPartitionIndex(10);
-        RemainingPartitionSet pSet = new RemainingPartitionSet();
+        RemainingPartitionCollection pSet = new RemainingPartitionCollection();
         pSet.add(p1);
         pSet.add(p2);
         ControlledShutdownResponseData data = new ControlledShutdownResponseData()
@@ -1522,7 +1521,7 @@ public class RequestResponseTest {
                 .setName("retention.ms")
                 .setConfigOperation((byte) 0)
                 .setValue("100");
-        AlterableConfigSet alterableConfigs = new AlterableConfigSet();
+        IncrementalAlterConfigsRequestData.AlterableConfigCollection alterableConfigs = new IncrementalAlterConfigsRequestData.AlterableConfigCollection();
         alterableConfigs.add(alterableConfig);
 
         data.resources().add(new AlterConfigsResource()
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
new file mode 100644
index 0000000..8c102dd
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Random;
+import java.util.Set;
+
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * A unit test for ImplicitLinkedHashCollection.
+ */
+public class ImplicitLinkedHashCollectionTest {
+    @Rule
+    final public Timeout globalTimeout = Timeout.millis(120000);
+
+    final static class TestElement implements ImplicitLinkedHashCollection.Element {
+        private int prev = ImplicitLinkedHashCollection.INVALID_INDEX;
+        private int next = ImplicitLinkedHashCollection.INVALID_INDEX;
+        private final int val;
+
+        TestElement(int val) {
+            this.val = val;
+        }
+
+        @Override
+        public int prev() {
+            return prev;
+        }
+
+        @Override
+        public void setPrev(int prev) {
+            this.prev = prev;
+        }
+
+        @Override
+        public int next() {
+            return next;
+        }
+
+        @Override
+        public void setNext(int next) {
+            this.next = next;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if ((o == null) || (o.getClass() != TestElement.class)) return false;
+            TestElement that = (TestElement) o;
+            return val == that.val;
+        }
+
+        @Override
+        public String toString() {
+            return "TestElement(" + val + ")";
+        }
+
+        @Override
+        public int hashCode() {
+            return val;
+        }
+    }
+
+    @Test
+    public void testNullForbidden() {
+        ImplicitLinkedHashMultiCollection<TestElement> multiColl = new ImplicitLinkedHashMultiCollection<>();
+        assertFalse(multiColl.add(null));
+    }
+
+    @Test
+    public void testInsertDelete() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>(100);
+        assertTrue(coll.add(new TestElement(1)));
+        TestElement second = new TestElement(2);
+        assertTrue(coll.add(second));
+        assertTrue(coll.add(new TestElement(3)));
+        assertFalse(coll.add(new TestElement(3)));
+        assertEquals(3, coll.size());
+        assertTrue(coll.contains(new TestElement(1)));
+        assertFalse(coll.contains(new TestElement(4)));
+        TestElement secondAgain = coll.find(new TestElement(2));
+        assertTrue(second == secondAgain);
+        assertTrue(coll.remove(new TestElement(1)));
+        assertFalse(coll.remove(new TestElement(1)));
+        assertEquals(2, coll.size());
+        coll.clear();
+        assertEquals(0, coll.size());
+    }
+
+    static void expectTraversal(Iterator<TestElement> iterator, Integer... sequence) {
+        int i = 0;
+        while (iterator.hasNext()) {
+            TestElement element = iterator.next();
+            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but only " +
+                    sequence.length + " were expected.", i < sequence.length);
+            Assert.assertEquals("Iterator value number " + (i + 1) + " was incorrect.",
+                    sequence[i].intValue(), element.val);
+            i = i + 1;
+        }
+        Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but " +
+                sequence.length + " were expected.", i == sequence.length);
+    }
+
+    static void expectTraversal(Iterator<TestElement> iter, Iterator<Integer> expectedIter) {
+        int i = 0;
+        while (iter.hasNext()) {
+            TestElement element = iter.next();
+            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but only " +
+                    i + " were expected.", expectedIter.hasNext());
+            Integer expected = expectedIter.next();
+            Assert.assertEquals("Iterator value number " + (i + 1) + " was incorrect.",
+                    expected.intValue(), element.val);
+            i = i + 1;
+        }
+        Assert.assertFalse("Iterator yieled " + i + " elements, but at least " +
+                (i + 1) + " were expected.", expectedIter.hasNext());
+    }
+
+    @Test
+    public void testTraversal() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        expectTraversal(coll.iterator());
+        assertTrue(coll.add(new TestElement(2)));
+        expectTraversal(coll.iterator(), 2);
+        assertTrue(coll.add(new TestElement(1)));
+        expectTraversal(coll.iterator(), 2, 1);
+        assertTrue(coll.add(new TestElement(100)));
+        expectTraversal(coll.iterator(), 2, 1, 100);
+        assertTrue(coll.remove(new TestElement(1)));
+        expectTraversal(coll.iterator(), 2, 100);
+        assertTrue(coll.add(new TestElement(1)));
+        expectTraversal(coll.iterator(), 2, 100, 1);
+        Iterator<TestElement> iter = coll.iterator();
+        iter.next();
+        iter.next();
+        iter.remove();
+        iter.next();
+        assertFalse(iter.hasNext());
+        expectTraversal(coll.iterator(), 2, 1);
+        List<TestElement> list = new ArrayList<>();
+        list.add(new TestElement(1));
+        list.add(new TestElement(2));
+        assertTrue(coll.removeAll(list));
+        assertFalse(coll.removeAll(list));
+        expectTraversal(coll.iterator());
+        assertEquals(0, coll.size());
+        assertTrue(coll.isEmpty());
+    }
+
+    @Test
+    public void testSetViewGet() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        Set<TestElement> set = coll.valuesSet();
+        assertTrue(set.contains(new TestElement(1)));
+        assertTrue(set.contains(new TestElement(2)));
+        assertTrue(set.contains(new TestElement(3)));
+        assertEquals(3, set.size());
+    }
+
+    @Test
+    public void testSetViewModification() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        // Removal from set is reflected in collection
+        Set<TestElement> set = coll.valuesSet();
+        set.remove(new TestElement(1));
+        assertFalse(coll.contains(new TestElement(1)));
+        assertEquals(2, coll.size());
+
+        // Addition to set is reflected in collection
+        set.add(new TestElement(4));
+        assertTrue(coll.contains(new TestElement(4)));
+        assertEquals(3, coll.size());
+
+        // Removal from collection is reflected in set
+        coll.remove(new TestElement(2));
+        assertFalse(set.contains(new TestElement(2)));
+        assertEquals(2, set.size());
+
+        // Addition to collection is reflected in set
+        coll.add(new TestElement(5));
+        assertTrue(set.contains(new TestElement(5)));
+        assertEquals(3, set.size());
+
+        // Ordering in the collection is maintained
+        int val = 3;
+        for (TestElement e : coll) {
+            assertEquals(val, e.val);
+            ++val;
+        }
+    }
+
+    @Test
+    public void testListViewGet() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        List<TestElement> list = coll.valuesList();
+        assertEquals(1, list.get(0).val);
+        assertEquals(2, list.get(1).val);
+        assertEquals(3, list.get(2).val);
+        assertEquals(3, list.size());
+    }
+
+    @Test
+    public void testListViewModification() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        // Removal from list is reflected in collection
+        List<TestElement> list = coll.valuesList();
+        list.remove(1);
+        assertTrue(coll.contains(new TestElement(1)));
+        assertFalse(coll.contains(new TestElement(2)));
+        assertTrue(coll.contains(new TestElement(3)));
+        assertEquals(2, coll.size());
+
+        // Removal from collection is reflected in list
+        coll.remove(new TestElement(1));
+        assertEquals(3, list.get(0).val);
+        assertEquals(1, list.size());
+
+        // Addition to collection is reflected in list
+        coll.add(new TestElement(4));
+        assertEquals(3, list.get(0).val);
+        assertEquals(4, list.get(1).val);
+        assertEquals(2, list.size());
+    }
+
+    @Test
+    public void testEmptyListIterator() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        ListIterator iter = coll.valuesList().listIterator();
+        assertFalse(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+    }
+
+    @Test
+    public void testListIteratorCreation() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+
+        // Iterator created at the start of the list should have a next but no prev
+        ListIterator<TestElement> iter = coll.valuesList().listIterator();
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+        // Iterator created in the middle of the list should have both a next and a prev
+        iter = coll.valuesList().listIterator(2);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        // Iterator created at the end of the list should have a prev but no next
+        iter = coll.valuesList().listIterator(3);
+        assertFalse(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(3, iter.nextIndex());
+        assertEquals(2, iter.previousIndex());
+    }
+
+    @Test
+    public void testListIteratorTraversal() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+        ListIterator<TestElement> iter = coll.valuesList().listIterator();
+
+        // Step the iterator forward to the end of the list
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+        assertEquals(1, iter.next().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        assertEquals(2, iter.next().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        assertEquals(3, iter.next().val);
+        assertFalse(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(3, iter.nextIndex());
+        assertEquals(2, iter.previousIndex());
+
+        // Step back to the middle of the list
+        assertEquals(3, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        assertEquals(2, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Step forward one and then back one, return value should remain the same
+        assertEquals(2, iter.next().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        assertEquals(2, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Step back to the front of the list
+        assertEquals(1, iter.previous().val);
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+    }
+
+    @Test
+    public void testListIteratorRemove() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        coll.add(new TestElement(1));
+        coll.add(new TestElement(2));
+        coll.add(new TestElement(3));
+        coll.add(new TestElement(4));
+        coll.add(new TestElement(5));
+
+        ListIterator<TestElement> iter = coll.valuesList().listIterator();
+        try {
+            iter.remove();
+            fail("Calling remove() without calling next() or previous() should raise an exception");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        // Remove after next()
+        iter.next();
+        iter.next();
+        iter.next();
+        iter.remove();
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(2, iter.nextIndex());
+        assertEquals(1, iter.previousIndex());
+
+        try {
+            iter.remove();
+            fail("Calling remove() twice without calling next() or previous() in between should raise an exception");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        // Remove after previous()
+        assertEquals(2, iter.previous().val);
+        iter.remove();
+        assertTrue(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Remove the first element of the list
+        assertEquals(1, iter.previous().val);
+        iter.remove();
+        assertTrue(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+        // Remove the last element of the list
+        assertEquals(4, iter.next().val);
+        assertEquals(5, iter.next().val);
+        iter.remove();
+        assertFalse(iter.hasNext());
+        assertTrue(iter.hasPrevious());
+        assertEquals(1, iter.nextIndex());
+        assertEquals(0, iter.previousIndex());
+
+        // Remove the final remaining element of the list
+        assertEquals(4, iter.previous().val);
+        iter.remove();
+        assertFalse(iter.hasNext());
+        assertFalse(iter.hasPrevious());
+        assertEquals(0, iter.nextIndex());
+        assertEquals(-1, iter.previousIndex());
+
+    }
+
+    @Test
+    public void testCollisions() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>(5);
+        assertEquals(11, coll.numSlots());
+        assertTrue(coll.add(new TestElement(11)));
+        assertTrue(coll.add(new TestElement(0)));
+        assertTrue(coll.add(new TestElement(22)));
+        assertTrue(coll.add(new TestElement(33)));
+        assertEquals(11, coll.numSlots());
+        expectTraversal(coll.iterator(), 11, 0, 22, 33);
+        assertTrue(coll.remove(new TestElement(22)));
+        expectTraversal(coll.iterator(), 11, 0, 33);
+        assertEquals(3, coll.size());
+        assertFalse(coll.isEmpty());
+    }
+
+    @Test
+    public void testEnlargement() {
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>(5);
+        assertEquals(11, coll.numSlots());
+        for (int i = 0; i < 6; i++) {
+            assertTrue(coll.add(new TestElement(i)));
+        }
+        assertEquals(23, coll.numSlots());
+        assertEquals(6, coll.size());
+        expectTraversal(coll.iterator(), 0, 1, 2, 3, 4, 5);
+        for (int i = 0; i < 6; i++) {
+            assertTrue("Failed to find element " + i, coll.contains(new TestElement(i)));
+        }
+        coll.remove(new TestElement(3));
+        assertEquals(23, coll.numSlots());
+        assertEquals(5, coll.size());
+        expectTraversal(coll.iterator(), 0, 1, 2, 4, 5);
+    }
+
+    @Test
+    public void testManyInsertsAndDeletes() {
+        Random random = new Random(123);
+        LinkedHashSet<Integer> existing = new LinkedHashSet<>();
+        ImplicitLinkedHashCollection<TestElement> coll = new ImplicitLinkedHashCollection<>();
+        for (int i = 0; i < 100; i++) {
+            addRandomElement(random, existing, coll);
+            addRandomElement(random, existing, coll);
+            addRandomElement(random, existing, coll);
+            removeRandomElement(random, existing, coll);
+            expectTraversal(coll.iterator(), existing.iterator());
+        }
+    }
+
+    @Test
+    public void testEquals() {
+        ImplicitLinkedHashCollection<TestElement> coll1 = new ImplicitLinkedHashCollection<>();
+        coll1.add(new TestElement(1));
+        coll1.add(new TestElement(2));
+        coll1.add(new TestElement(3));
+
+        ImplicitLinkedHashCollection<TestElement> coll2 = new ImplicitLinkedHashCollection<>();
+        coll2.add(new TestElement(1));
+        coll2.add(new TestElement(2));
+        coll2.add(new TestElement(3));
+
+        ImplicitLinkedHashCollection<TestElement> coll3 = new ImplicitLinkedHashCollection<>();
+        coll3.add(new TestElement(1));
+        coll3.add(new TestElement(3));
+        coll3.add(new TestElement(2));
+
+        assertEquals(coll1, coll2);
+        assertNotEquals(coll1, coll3);
+        assertNotEquals(coll2, coll3);
+    }
+
+    private void addRandomElement(Random random, LinkedHashSet<Integer> existing,
+                                  ImplicitLinkedHashCollection<TestElement> set) {
+        int next;
+        do {
+            next = random.nextInt();
+        } while (existing.contains(next));
+        existing.add(next);
+        set.add(new TestElement(next));
+    }
+
+    private void removeRandomElement(Random random, Collection<Integer> existing,
+                                     ImplicitLinkedHashCollection<TestElement> coll) {
+        int removeIdx = random.nextInt(existing.size());
+        Iterator<Integer> iter = existing.iterator();
+        Integer element = null;
+        for (int i = 0; i <= removeIdx; i++) {
+            element = iter.next();
+        }
+        existing.remove(new TestElement(element));
+    }
+}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSetTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
similarity index 88%
rename from clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSetTest.java
rename to clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
index 950deb8..8d2b850 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiSetTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
-import org.apache.kafka.common.utils.ImplicitLinkedHashSetTest.TestElement;
+import org.apache.kafka.common.utils.ImplicitLinkedHashCollectionTest.TestElement;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -32,21 +32,21 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * A unit test for ImplicitLinkedHashMultiSet.
+ * A unit test for ImplicitLinkedHashMultiCollection.
  */
-public class ImplicitLinkedHashMultiSetTest {
+public class ImplicitLinkedHashMultiCollectionTest {
     @Rule
     final public Timeout globalTimeout = Timeout.millis(120000);
 
     @Test
     public void testNullForbidden() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new ImplicitLinkedHashMultiSet<>();
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new ImplicitLinkedHashMultiCollection<>();
         assertFalse(multiSet.add(null));
     }
 
     @Test
     public void testInsertDelete() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new ImplicitLinkedHashMultiSet<>(100);
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new ImplicitLinkedHashMultiCollection<>(100);
         TestElement e1 = new TestElement(1);
         TestElement e2 = new TestElement(1);
         TestElement e3 = new TestElement(2);
@@ -64,7 +64,7 @@ public class ImplicitLinkedHashMultiSetTest {
 
     @Test
     public void testTraversal() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new ImplicitLinkedHashMultiSet<>();
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new ImplicitLinkedHashMultiCollection<>();
         expectExactTraversal(multiSet.iterator());
         TestElement e1 = new TestElement(1);
         TestElement e2 = new TestElement(1);
@@ -96,7 +96,7 @@ public class ImplicitLinkedHashMultiSetTest {
 
     @Test
     public void testEnlargement() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new ImplicitLinkedHashMultiSet<>(5);
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new ImplicitLinkedHashMultiCollection<>(5);
         assertEquals(11, multiSet.numSlots());
         TestElement[] testElements = {
             new TestElement(100),
@@ -126,7 +126,7 @@ public class ImplicitLinkedHashMultiSetTest {
     public void testManyInsertsAndDeletes() {
         Random random = new Random(123);
         LinkedList<TestElement> existing = new LinkedList<>();
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new ImplicitLinkedHashMultiSet<>();
+        ImplicitLinkedHashMultiCollection<TestElement> multiSet = new ImplicitLinkedHashMultiCollection<>();
         for (int i = 0; i < 100; i++) {
             for (int j = 0; j < 4; j++) {
                 TestElement testElement = new TestElement(random.nextInt());
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
deleted file mode 100644
index 156eba2..0000000
--- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashSetTest.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertEquals;
-
-/**
- * A unit test for ImplicitLinkedHashSet.
- */
-public class ImplicitLinkedHashSetTest {
-    @Rule
-    final public Timeout globalTimeout = Timeout.millis(120000);
-
-    final static class TestElement implements ImplicitLinkedHashSet.Element {
-        private int prev = ImplicitLinkedHashSet.INVALID_INDEX;
-        private int next = ImplicitLinkedHashSet.INVALID_INDEX;
-        private final int val;
-
-        TestElement(int val) {
-            this.val = val;
-        }
-
-        @Override
-        public int prev() {
-            return prev;
-        }
-
-        @Override
-        public void setPrev(int prev) {
-            this.prev = prev;
-        }
-
-        @Override
-        public int next() {
-            return next;
-        }
-
-        @Override
-        public void setNext(int next) {
-            this.next = next;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if ((o == null) || (o.getClass() != TestElement.class)) return false;
-            TestElement that = (TestElement) o;
-            return val == that.val;
-        }
-
-        @Override
-        public String toString() {
-            return "TestElement(" + val + ")";
-        }
-
-        @Override
-        public int hashCode() {
-            return val;
-        }
-    }
-
-    @Test
-    public void testNullForbidden() {
-        ImplicitLinkedHashMultiSet<TestElement> multiSet = new ImplicitLinkedHashMultiSet<>();
-        assertFalse(multiSet.add(null));
-    }
-
-    @Test
-    public void testInsertDelete() {
-        ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(100);
-        assertTrue(set.add(new TestElement(1)));
-        TestElement second = new TestElement(2);
-        assertTrue(set.add(second));
-        assertTrue(set.add(new TestElement(3)));
-        assertFalse(set.add(new TestElement(3)));
-        assertEquals(3, set.size());
-        assertTrue(set.contains(new TestElement(1)));
-        assertFalse(set.contains(new TestElement(4)));
-        TestElement secondAgain = set.find(new TestElement(2));
-        assertTrue(second == secondAgain);
-        assertTrue(set.remove(new TestElement(1)));
-        assertFalse(set.remove(new TestElement(1)));
-        assertEquals(2, set.size());
-        set.clear();
-        assertEquals(0, set.size());
-    }
-
-    static void expectTraversal(Iterator<TestElement> iterator, Integer... sequence) {
-        int i = 0;
-        while (iterator.hasNext()) {
-            TestElement element = iterator.next();
-            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but only " +
-                sequence.length + " were expected.", i < sequence.length);
-            Assert.assertEquals("Iterator value number " + (i + 1) + " was incorrect.",
-                sequence[i].intValue(), element.val);
-            i = i + 1;
-        }
-        Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but " +
-            sequence.length + " were expected.", i == sequence.length);
-    }
-
-    static void expectTraversal(Iterator<TestElement> iter, Iterator<Integer> expectedIter) {
-        int i = 0;
-        while (iter.hasNext()) {
-            TestElement element = iter.next();
-            Assert.assertTrue("Iterator yieled " + (i + 1) + " elements, but only " +
-                i + " were expected.", expectedIter.hasNext());
-            Integer expected = expectedIter.next();
-            Assert.assertEquals("Iterator value number " + (i + 1) + " was incorrect.",
-                expected.intValue(), element.val);
-            i = i + 1;
-        }
-        Assert.assertFalse("Iterator yieled " + i + " elements, but at least " +
-            (i + 1) + " were expected.", expectedIter.hasNext());
-    }
-
-    @Test
-    public void testTraversal() {
-        ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>();
-        expectTraversal(set.iterator());
-        assertTrue(set.add(new TestElement(2)));
-        expectTraversal(set.iterator(), 2);
-        assertTrue(set.add(new TestElement(1)));
-        expectTraversal(set.iterator(), 2, 1);
-        assertTrue(set.add(new TestElement(100)));
-        expectTraversal(set.iterator(), 2, 1, 100);
-        assertTrue(set.remove(new TestElement(1)));
-        expectTraversal(set.iterator(), 2, 100);
-        assertTrue(set.add(new TestElement(1)));
-        expectTraversal(set.iterator(), 2, 100, 1);
-        Iterator<TestElement> iter = set.iterator();
-        iter.next();
-        iter.next();
-        iter.remove();
-        iter.next();
-        assertFalse(iter.hasNext());
-        expectTraversal(set.iterator(), 2, 1);
-        List<TestElement> list = new ArrayList<>();
-        list.add(new TestElement(1));
-        list.add(new TestElement(2));
-        assertTrue(set.removeAll(list));
-        assertFalse(set.removeAll(list));
-        expectTraversal(set.iterator());
-        assertEquals(0, set.size());
-        assertTrue(set.isEmpty());
-    }
-
-    @Test
-    public void testCollisions() {
-        ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(5);
-        assertEquals(11, set.numSlots());
-        assertTrue(set.add(new TestElement(11)));
-        assertTrue(set.add(new TestElement(0)));
-        assertTrue(set.add(new TestElement(22)));
-        assertTrue(set.add(new TestElement(33)));
-        assertEquals(11, set.numSlots());
-        expectTraversal(set.iterator(), 11, 0, 22, 33);
-        assertTrue(set.remove(new TestElement(22)));
-        expectTraversal(set.iterator(), 11, 0, 33);
-        assertEquals(3, set.size());
-        assertFalse(set.isEmpty());
-    }
-
-    @Test
-    public void testEnlargement() {
-        ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>(5);
-        assertEquals(11, set.numSlots());
-        for (int i = 0; i < 6; i++) {
-            assertTrue(set.add(new TestElement(i)));
-        }
-        assertEquals(23, set.numSlots());
-        assertEquals(6, set.size());
-        expectTraversal(set.iterator(), 0, 1, 2, 3, 4, 5);
-        for (int i = 0; i < 6; i++) {
-            assertTrue("Failed to find element " + i, set.contains(new TestElement(i)));
-        }
-        set.remove(new TestElement(3));
-        assertEquals(23, set.numSlots());
-        assertEquals(5, set.size());
-        expectTraversal(set.iterator(), 0, 1, 2, 4, 5);
-    }
-
-    @Test
-    public void testManyInsertsAndDeletes() {
-        Random random = new Random(123);
-        LinkedHashSet<Integer> existing = new LinkedHashSet<>();
-        ImplicitLinkedHashSet<TestElement> set = new ImplicitLinkedHashSet<>();
-        for (int i = 0; i < 100; i++) {
-            addRandomElement(random, existing, set);
-            addRandomElement(random, existing, set);
-            addRandomElement(random, existing, set);
-            removeRandomElement(random, existing, set);
-            expectTraversal(set.iterator(), existing.iterator());
-        }
-    }
-
-    private void addRandomElement(Random random, LinkedHashSet<Integer> existing,
-                                  ImplicitLinkedHashSet<TestElement> set) {
-        int next;
-        do {
-            next = random.nextInt();
-        } while (existing.contains(next));
-        existing.add(next);
-        set.add(new TestElement(next));
-    }
-
-    private void removeRandomElement(Random random, Collection<Integer> existing,
-                             ImplicitLinkedHashSet<TestElement> set) {
-        int removeIdx = random.nextInt(existing.size());
-        Iterator<Integer> iter = existing.iterator();
-        Integer element = null;
-        for (int i = 0; i <= removeIdx; i++) {
-            element = iter.next();
-        }
-        existing.remove(new TestElement(element));
-    }
-}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 968855a..34b2376 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -146,11 +146,11 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    public JoinGroupRequestData.JoinGroupRequestProtocolSet metadata() {
+    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
         configSnapshot = configStorage.snapshot();
         ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
         ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
-        return new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+        return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
                 Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
                         .setName(DEFAULT_SUBPROTOCOL)
                         .setMetadata(metadata.array()))
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index cda5f61..3684025 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -190,7 +190,7 @@ public class WorkerCoordinatorTest {
 
         PowerMock.replayAll();
 
-        JoinGroupRequestData.JoinGroupRequestProtocolSet serialized = coordinator.metadata();
+        JoinGroupRequestData.JoinGroupRequestProtocolCollection serialized = coordinator.metadata();
         assertEquals(1, serialized.size());
 
         Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator = serialized.iterator();
diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 16ee872..fe97346 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.Records
 import org.apache.kafka.common.requests.FetchMetadata.{FINAL_EPOCH, INITIAL_EPOCH, INVALID_SESSION_ID}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, FetchMetadata => JFetchMetadata}
-import org.apache.kafka.common.utils.{ImplicitLinkedHashSet, Time, Utils}
+import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, Time, Utils}
 
 import scala.math.Ordered.orderingToOrdered
 import scala.collection.{mutable, _}
@@ -38,7 +38,7 @@ import scala.collection.JavaConverters._
 object FetchSession {
   type REQ_MAP = util.Map[TopicPartition, FetchRequest.PartitionData]
   type RESP_MAP = util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
-  type CACHE_MAP = ImplicitLinkedHashSet[CachedPartition]
+  type CACHE_MAP = ImplicitLinkedHashCollection[CachedPartition]
   type RESP_MAP_ITER = util.Iterator[util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]]
 
   val NUM_INCREMENTAL_FETCH_SESSISONS = "NumIncrementalFetchSessions"
@@ -79,10 +79,10 @@ class CachedPartition(val topic: String,
                       var highWatermark: Long,
                       var fetcherLogStartOffset: Long,
                       var localLogStartOffset: Long)
-    extends ImplicitLinkedHashSet.Element {
+    extends ImplicitLinkedHashCollection.Element {
 
-  var cachedNext: Int = ImplicitLinkedHashSet.INVALID_INDEX
-  var cachedPrev: Int = ImplicitLinkedHashSet.INVALID_INDEX
+  var cachedNext: Int = ImplicitLinkedHashCollection.INVALID_INDEX
+  var cachedPrev: Int = ImplicitLinkedHashCollection.INVALID_INDEX
 
   override def next = cachedNext
   override def setNext(next: Int) = this.cachedNext = next
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 34ed7d7..302718d 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -47,9 +47,9 @@ import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal}
 import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet}
+import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection}
 import org.apache.kafka.common.message._
-import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet}
+import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -1355,7 +1355,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val requireKnownMemberId = joinGroupRequest.version >= 4 && groupInstanceId.isEmpty
 
       // let the coordinator handle join-group
-      val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
+      val protocols = joinGroupRequest.data().protocols().valuesList.asScala.map(protocol =>
         (protocol.name, protocol.metadata)).toList
       groupCoordinator.handleJoinGroup(
         joinGroupRequest.data().groupId,
@@ -1496,7 +1496,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTopicsRequest(request: RequestChannel.Request) {
-    def sendResponseCallback(results: CreatableTopicResultSet): Unit = {
+    def sendResponseCallback(results: CreatableTopicResultCollection): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val responseData = new CreateTopicsResponseData().
           setThrottleTimeMs(requestThrottleMs).
@@ -1510,7 +1510,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val createTopicsRequest = request.body[CreateTopicsRequest]
-    val results = new CreatableTopicResultSet(createTopicsRequest.data().topics().size())
+    val results = new CreatableTopicResultCollection(createTopicsRequest.data().topics().size())
     if (!controller.isActive) {
       createTopicsRequest.data.topics.asScala.foreach { case topic =>
         results.add(new CreatableTopicResult().setName(topic.name()).
@@ -1594,7 +1594,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
-    def sendResponseCallback(results: DeletableTopicResultSet): Unit = {
+    def sendResponseCallback(results: DeletableTopicResultCollection): Unit = {
       def createResponse(requestThrottleMs: Int): AbstractResponse = {
         val responseData = new DeleteTopicsResponseData()
           .setThrottleTimeMs(requestThrottleMs)
@@ -1607,7 +1607,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val deleteTopicRequest = request.body[DeleteTopicsRequest]
-    val results = new DeletableTopicResultSet(deleteTopicRequest.data.topicNames.size)
+    val results = new DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
     val toDelete = mutable.Set[String]()
     if (!controller.isActive) {
       deleteTopicRequest.data.topicNames.asScala.foreach { case topic =>
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index ee1c02b..ac8153c 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -34,8 +34,9 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
 import org.apache.kafka.common.message._
-import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigSet}
+import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Records, RecordBatch, SimpleRecord}
@@ -320,7 +321,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def createJoinGroupRequest = {
-    val protocolSet = new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+    val protocolSet = new JoinGroupRequestProtocolCollection(
       Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
         .setName("consumer-range")
         .setMetadata("test".getBytes())
@@ -395,7 +396,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def createTopicsRequest =
     new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(
-      new CreatableTopicSet(Collections.singleton(new CreatableTopic().
+      new CreatableTopicCollection(Collections.singleton(new CreatableTopic().
         setName(createTopic).setNumPartitions(1).
           setReplicationFactor(1.toShort)).iterator))).build()
 
@@ -422,7 +423,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val alterableConfig = new AlterableConfig
     alterableConfig.setName(LogConfig.MaxMessageBytesProp).
       setValue("1000000").setConfigOperation(AlterConfigOp.OpType.SET.id())
-    val alterableConfigSet = new AlterableConfigSet
+    val alterableConfigSet = new AlterableConfigCollection
     alterableConfigSet.add(alterableConfig)
     data.resources().add(new AlterConfigsResource().
       setResourceName(tp.topic).setResourceType(ConfigResource.Type.TOPIC.id()).
diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index a54e5fc..514e7ae 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -41,7 +41,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
                 validateOnly: Boolean = false) = {
     val req = new CreateTopicsRequestData()
     req.setTimeoutMs(timeout)
-    req.setTopics(new CreatableTopicSet(topics.asJava.iterator()))
+    req.setTopics(new CreatableTopicCollection(topics.asJava.iterator()))
     req.setValidateOnly(validateOnly)
     new CreateTopicsRequest.Builder(req).build()
   }
@@ -68,7 +68,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
       topic.setReplicationFactor(1.toShort)
     }
     if (config != null) {
-      val effectiveConfigs = new CreateableTopicConfigSet()
+      val effectiveConfigs = new CreateableTopicConfigCollection()
       config.foreach {
         case (name, value) => {
           effectiveConfigs.add(new CreateableTopicConfig().setName(name).setValue(value))
@@ -77,7 +77,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest {
       topic.setConfigs(effectiveConfigs)
     }
     if (assignment != null) {
-      val effectiveAssignments = new CreatableReplicaAssignmentSet()
+      val effectiveAssignments = new CreatableReplicaAssignmentCollection()
       assignment.foreach {
         case (partitionIndex, brokerIdList) => {
           val effectiveAssignment = new CreatableReplicaAssignment()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 54316f3..b83dd92 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -48,6 +48,8 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.easymock.{Capture, EasyMock, IAnswer}
 import EasyMock._
+import org.apache.kafka.common.message.JoinGroupRequestData
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
 import org.apache.kafka.common.message.OffsetCommitRequestData
 import org.junit.Assert.{assertEquals, assertNull, assertTrue}
 import org.junit.{After, Test}
@@ -508,6 +510,41 @@ class KafkaApisTest {
     assertNull(partitionData.abortedTransactions)
   }
 
+  @Test
+  def testJoinGroupProtocolsOrder: Unit = {
+    val protocols = List(
+      new JoinGroupRequestProtocol().setName("first").setMetadata("first".getBytes()),
+      new JoinGroupRequestProtocol().setName("second").setMetadata("second".getBytes())
+    )
+
+    EasyMock.expect(groupCoordinator.handleJoinGroup(
+      anyString,
+      anyString,
+      anyObject(classOf[Option[String]]),
+      anyBoolean,
+      anyString,
+      anyString,
+      anyInt,
+      anyInt,
+      anyString,
+      EasyMock.eq(protocols.map(protocol => (protocol.name, protocol.metadata))),
+      anyObject()
+    ))
+
+    createKafkaApis().handleJoinGroupRequest(
+      buildRequest(
+        new JoinGroupRequest.Builder(
+          new JoinGroupRequestData()
+            .setGroupId("test")
+            .setMemberId("test")
+            .setProtocolType("consumer")
+            .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection(protocols.iterator.asJava))
+        )
+      )._2)
+
+    EasyMock.replay(groupCoordinator)
+  }
+
   /**
    * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
    */
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index b4c1268..52ac700 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -27,7 +27,9 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.message._
 import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType}
 import org.apache.kafka.common.{Node, TopicPartition}
-import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet}
+import org.apache.kafka.common.message.ControlledShutdownRequestData
+import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
+import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection
 import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -288,7 +290,7 @@ class RequestQuotaTest extends BaseRequestTest {
               .setGroupInstanceId(null)
               .setProtocolType("consumer")
               .setProtocols(
-                new JoinGroupRequestData.JoinGroupRequestProtocolSet(
+                new JoinGroupRequestProtocolCollection(
                   Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol()
                     .setName("consumer-range")
                     .setMetadata("test".getBytes())).iterator()
@@ -324,7 +326,7 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.CREATE_TOPICS => {
           new CreateTopicsRequest.Builder(
             new CreateTopicsRequestData().setTopics(
-              new CreatableTopicSet(Collections.singleton(
+              new CreatableTopicCollection(Collections.singleton(
                 new CreatableTopic().setName("topic-2").setNumPartitions(1).
                   setReplicationFactor(1.toShort)).iterator())))
         }
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
index c8e70bb..d6cd5f3 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
@@ -120,8 +120,8 @@ public final class MessageDataGenerator {
             headerGenerator.addImport(MessageGenerator.MESSAGE_CLASS);
         }
         if (isSetElement) {
-            headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-            implementedInterfaces.add("ImplicitLinkedHashMultiSet.Element");
+            headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+            implementedInterfaces.add("ImplicitLinkedHashMultiCollection.Element");
         }
         Set<String> classModifiers = new HashSet<>();
         classModifiers.add("public");
@@ -152,9 +152,9 @@ public final class MessageDataGenerator {
 
     private void generateHashSet(String className, StructSpec struct) {
         buffer.printf("%n");
-        headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-        buffer.printf("public static class %s extends ImplicitLinkedHashMultiSet<%s> {%n",
-            hashSetType(className), className);
+        headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+        buffer.printf("public static class %s extends ImplicitLinkedHashMultiCollection<%s> {%n",
+            collectionType(className), className);
         buffer.incrementIndent();
         generateHashSetZeroArgConstructor(className);
         generateHashSetSizeArgConstructor(className);
@@ -166,7 +166,7 @@ public final class MessageDataGenerator {
     }
 
     private void generateHashSetZeroArgConstructor(String className) {
-        buffer.printf("public %s() {%n", hashSetType(className));
+        buffer.printf("public %s() {%n", collectionType(className));
         buffer.incrementIndent();
         buffer.printf("super();%n");
         buffer.decrementIndent();
@@ -175,7 +175,7 @@ public final class MessageDataGenerator {
     }
 
     private void generateHashSetSizeArgConstructor(String className) {
-        buffer.printf("public %s(int expectedNumElements) {%n", hashSetType(className));
+        buffer.printf("public %s(int expectedNumElements) {%n", collectionType(className));
         buffer.incrementIndent();
         buffer.printf("super(expectedNumElements);%n");
         buffer.decrementIndent();
@@ -185,7 +185,7 @@ public final class MessageDataGenerator {
 
     private void generateHashSetIteratorConstructor(String className) {
         headerGenerator.addImport(MessageGenerator.ITERATOR_CLASS);
-        buffer.printf("public %s(Iterator<%s> iterator) {%n", hashSetType(className), className);
+        buffer.printf("public %s(Iterator<%s> iterator) {%n", collectionType(className), className);
         buffer.incrementIndent();
         buffer.printf("super(iterator);%n");
         buffer.decrementIndent();
@@ -199,7 +199,7 @@ public final class MessageDataGenerator {
             commaSeparatedHashSetFieldAndTypes(struct));
         buffer.incrementIndent();
         generateKeyElement(className, struct);
-        headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
+        headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
         buffer.printf("return find(key);%n");
         buffer.decrementIndent();
         buffer.printf("}%n");
@@ -212,7 +212,7 @@ public final class MessageDataGenerator {
             commaSeparatedHashSetFieldAndTypes(struct));
         buffer.incrementIndent();
         generateKeyElement(className, struct);
-        headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
+        headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
         buffer.printf("return findAll(key);%n");
         buffer.decrementIndent();
         buffer.printf("}%n");
@@ -283,8 +283,8 @@ public final class MessageDataGenerator {
         }
     }
 
-    private static String hashSetType(String baseType) {
-        return baseType + "Set";
+    private static String collectionType(String baseType) {
+        return baseType + "Collection";
     }
 
     private String fieldAbstractJavaType(FieldSpec field) {
@@ -307,8 +307,8 @@ public final class MessageDataGenerator {
         } else if (field.type().isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
             if (field.toStruct().hasKeys()) {
-                headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-                return hashSetType(arrayType.elementType().toString());
+                headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+                return collectionType(arrayType.elementType().toString());
             } else {
                 headerGenerator.addImport(MessageGenerator.LIST_CLASS);
                 return "List<" + getBoxedJavaType(arrayType.elementType()) + ">";
@@ -322,8 +322,8 @@ public final class MessageDataGenerator {
         if (field.type().isArray()) {
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
             if (field.toStruct().hasKeys()) {
-                headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_SET_CLASS);
-                return hashSetType(arrayType.elementType().toString());
+                headerGenerator.addImport(MessageGenerator.IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS);
+                return collectionType(arrayType.elementType().toString());
             } else {
                 headerGenerator.addImport(MessageGenerator.ARRAYLIST_CLASS);
                 return "ArrayList<" + getBoxedJavaType(arrayType.elementType()) + ">";
@@ -1233,7 +1233,7 @@ public final class MessageDataGenerator {
             }
             FieldType.ArrayType arrayType = (FieldType.ArrayType) field.type();
             if (field.toStruct().hasKeys()) {
-                return "new " + hashSetType(arrayType.elementType().toString()) + "(0)";
+                return "new " + collectionType(arrayType.elementType().toString()) + "(0)";
             } else {
                 headerGenerator.addImport(MessageGenerator.ARRAYLIST_CLASS);
                 return "new ArrayList<" + getBoxedJavaType(arrayType.elementType()) + ">()";
diff --git a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
index 18295c6..e227ded 100644
--- a/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
+++ b/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java
@@ -59,8 +59,8 @@ public final class MessageGenerator {
 
     static final String ARRAYLIST_CLASS = "java.util.ArrayList";
 
-    static final String IMPLICIT_LINKED_HASH_MULTI_SET_CLASS =
-        "org.apache.kafka.common.utils.ImplicitLinkedHashMultiSet";
+    static final String IMPLICIT_LINKED_HASH_MULTI_COLLECTION_CLASS =
+        "org.apache.kafka.common.utils.ImplicitLinkedHashMultiCollection";
 
     static final String UNSUPPORTED_VERSION_EXCEPTION_CLASS =
         "org.apache.kafka.common.errors.UnsupportedVersionException";