You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@ignite.apache.org by "denis-chudov (via GitHub)" <gi...@apache.org> on 2023/05/29 18:44:21 UTC

[GitHub] [ignite-3] denis-chudov opened a new pull request, #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

denis-chudov opened a new pull request, #2115:
URL: https://github.com/apache/ignite-3/pull/2115

   https://issues.apache.org/jira/browse/IGNITE-19478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] denis-chudov commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "denis-chudov (via GitHub)" <gi...@apache.org>.
denis-chudov commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211336859


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] denis-chudov commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "denis-chudov (via GitHub)" <gi...@apache.org>.
denis-chudov commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211326530


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);

Review Comment:
   thanks, fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] denis-chudov commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "denis-chudov (via GitHub)" <gi...@apache.org>.
denis-chudov commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211321154


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2004,7 +2005,7 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
 
         Set<Assignment> stableAssignments = stableAssignmentsBytes == null
                 // This is for the case when the first rebalance occurs.
-                ? ((List<Set<Assignment>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
+                ? Assignments.fromBytes(tblCfg.assignments().value()).get(partId)
                 : ByteUtils.fromBytes(stableAssignmentsBytes);

Review Comment:
   `stableAssignmentsBytes` is taken from different meta storage entries, managed by rebalance. If it is `null`, assignments for partition are taken from table configuration. Serialization was reworked only for table configuration, not for rebalance keys.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] denis-chudov commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "denis-chudov (via GitHub)" <gi...@apache.org>.
denis-chudov commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211354394


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);

Review Comment:
   maybe, if we make this an utility method, as @sanpwc suggested, there should be no such assumption, size should be Integer instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] denis-chudov closed pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "denis-chudov (via GitHub)" <gi...@apache.org>.
denis-chudov closed pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage
URL: https://github.com/apache/ignite-3/pull/2115


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] denis-chudov commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "denis-chudov (via GitHub)" <gi...@apache.org>.
denis-chudov commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211355178


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());

Review Comment:
   agree



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sanpwc commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "sanpwc (via GitHub)" <gi...@apache.org>.
sanpwc commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211229240


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {

Review Comment:
   Looks like an utility function. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] sanpwc commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "sanpwc (via GitHub)" <gi...@apache.org>.
sanpwc commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211230205


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());
+
+        for (byte[] o : objects) {
+            buf.putShort((short) o.length);
+            buf.put(o);
+        }
+
+        return buf.array();
+    }
+
+    /**
+     * Deserializes the list from byte array.
+     *
+     * @param bytes Byte array.
+     * @param transform Transform function to create list element.
+     * @return List.
+     */
+    private static <T> List<T> bytesToList(byte[] bytes, Function<byte[], T> transform) {

Review Comment:
   Same as above, looks like an utility function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] denis-chudov commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "denis-chudov (via GitHub)" <gi...@apache.org>.
denis-chudov commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211358362


##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());
+
+        for (byte[] o : objects) {
+            buf.putShort((short) o.length);
+            buf.put(o);
+        }
+
+        return buf.array();
+    }
+
+    /**
+     * Deserializes the list from byte array.
+     *
+     * @param bytes Byte array.
+     * @param transform Transform function to create list element.
+     * @return List.
+     */
+    private static <T> List<T> bytesToList(byte[] bytes, Function<byte[], T> transform) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        short length = buf.getShort();
+        assert length >= 0 : "Negative collection size: " + length;
+
+        List<T> result = new ArrayList<>(length);
+
+        for (int i = 0; i < length; i++) {
+            short size = buf.getShort();
+            assert size >= 0 : "Negative object size: " + size + ", index=" + i;
+            byte[] arr = new byte[size];
+            buf.get(arr);
+            result.add(transform.apply(arr));
+        }
+
+        return result;
+    }
+
+    private static byte[] assignmentToBytes(Assignment assignment, Function<String, Integer> consistentIdToIndex) {
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
+        buf.putInt(consistentIdToIndex.apply(assignment.consistentId()));
+        buf.put((byte) (assignment.isPeer() ? 1 : 0));
+        return buf.array();
+    }
+
+    private static Assignment bytesToAssignment(byte[] bytes, Function<Integer, String> indexToConsistentId) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+        int index = buf.getInt();
+        boolean isPeer = buf.get() == 1;
+        String consistentId = indexToConsistentId.apply(index);
+        return isPeer ? forPeer(consistentId) : forLearner(consistentId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Assignments that = (Assignments) o;
+
+        return assignments != null ? assignments.equals(that.assignments) : that.assignments == null;

Review Comment:
   It can't. Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [ignite-3] ibessonov commented on a diff in pull request #2115: IGNITE-19478 Possible excess size of table entities written into meta storage

Posted by "ibessonov (via GitHub)" <gi...@apache.org>.
ibessonov commented on code in PR #2115:
URL: https://github.com/apache/ignite-3/pull/2115#discussion_r1211198175


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -2004,7 +2005,7 @@ private CompletableFuture<Void> handleChangePendingAssignmentEvent(
 
         Set<Assignment> stableAssignments = stableAssignmentsBytes == null
                 // This is for the case when the first rebalance occurs.
-                ? ((List<Set<Assignment>>) ByteUtils.fromBytes(tblCfg.assignments().value())).get(partId)
+                ? Assignments.fromBytes(tblCfg.assignments().value()).get(partId)
                 : ByteUtils.fromBytes(stableAssignmentsBytes);

Review Comment:
   This one still uses ByteUtils, is this necessary?



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);

Review Comment:
   You should always pass encoding into `getBytes`



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;

Review Comment:
   Minor suggestion, but making this variable an `Integer` will result in less garbage



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);

Review Comment:
   I feel like this (`buf.putInt(assignmentsBytes.length)`) is unnecessary, you can calculate the remaining size from what you have already read



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);

Review Comment:
   You forgot to explicitly state buffers endianness. Please do it



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);

Review Comment:
   Same here, endianness is missing



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());

Review Comment:
   Like here, there must be a check that the conversion is safe, otherwise the structure is broken



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);

Review Comment:
   I think all these assumptions about sizes must be explicitly documented



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());
+
+        for (byte[] o : objects) {
+            buf.putShort((short) o.length);
+            buf.put(o);
+        }
+
+        return buf.array();
+    }
+
+    /**
+     * Deserializes the list from byte array.
+     *
+     * @param bytes Byte array.
+     * @param transform Transform function to create list element.
+     * @return List.
+     */
+    private static <T> List<T> bytesToList(byte[] bytes, Function<byte[], T> transform) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        short length = buf.getShort();
+        assert length >= 0 : "Negative collection size: " + length;
+
+        List<T> result = new ArrayList<>(length);
+
+        for (int i = 0; i < length; i++) {
+            short size = buf.getShort();
+            assert size >= 0 : "Negative object size: " + size + ", index=" + i;
+            byte[] arr = new byte[size];
+            buf.get(arr);
+            result.add(transform.apply(arr));
+        }
+
+        return result;
+    }
+
+    private static byte[] assignmentToBytes(Assignment assignment, Function<String, Integer> consistentIdToIndex) {
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
+        buf.putInt(consistentIdToIndex.apply(assignment.consistentId()));
+        buf.put((byte) (assignment.isPeer() ? 1 : 0));

Review Comment:
   Seems a bit wasteful. We could encode the "peer" bit in the consistent id itself.
   By the way, do we use this class for CMG/Meta-Storage or not? They are replicated, in a nutshell, so this encoding would result in huge payloads anyway



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());
+
+        for (byte[] o : objects) {
+            buf.putShort((short) o.length);
+            buf.put(o);
+        }
+
+        return buf.array();
+    }
+
+    /**
+     * Deserializes the list from byte array.
+     *
+     * @param bytes Byte array.
+     * @param transform Transform function to create list element.
+     * @return List.
+     */
+    private static <T> List<T> bytesToList(byte[] bytes, Function<byte[], T> transform) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        short length = buf.getShort();
+        assert length >= 0 : "Negative collection size: " + length;
+
+        List<T> result = new ArrayList<>(length);
+
+        for (int i = 0; i < length; i++) {
+            short size = buf.getShort();
+            assert size >= 0 : "Negative object size: " + size + ", index=" + i;
+            byte[] arr = new byte[size];

Review Comment:
   Again, I recommend using byte buffers and not waste memory



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));

Review Comment:
   This method may have a `ByteBuffer` as a parameter, would be more convenient I believe



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];

Review Comment:
   Instead of making an entire new array, you can use a byte buffer slice (or even the same buffer with corresponding position and limit, that's even better) and not waste so much memory. Same for other arrays



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());
+
+        for (byte[] o : objects) {
+            buf.putShort((short) o.length);
+            buf.put(o);
+        }
+
+        return buf.array();
+    }
+
+    /**
+     * Deserializes the list from byte array.
+     *
+     * @param bytes Byte array.
+     * @param transform Transform function to create list element.
+     * @return List.
+     */
+    private static <T> List<T> bytesToList(byte[] bytes, Function<byte[], T> transform) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        short length = buf.getShort();
+        assert length >= 0 : "Negative collection size: " + length;
+
+        List<T> result = new ArrayList<>(length);
+
+        for (int i = 0; i < length; i++) {
+            short size = buf.getShort();
+            assert size >= 0 : "Negative object size: " + size + ", index=" + i;
+            byte[] arr = new byte[size];
+            buf.get(arr);
+            result.add(transform.apply(arr));
+        }
+
+        return result;
+    }
+
+    private static byte[] assignmentToBytes(Assignment assignment, Function<String, Integer> consistentIdToIndex) {
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
+        buf.putInt(consistentIdToIndex.apply(assignment.consistentId()));
+        buf.put((byte) (assignment.isPeer() ? 1 : 0));
+        return buf.array();
+    }
+
+    private static Assignment bytesToAssignment(byte[] bytes, Function<Integer, String> indexToConsistentId) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+        int index = buf.getInt();
+        boolean isPeer = buf.get() == 1;
+        String consistentId = indexToConsistentId.apply(index);
+        return isPeer ? forPeer(consistentId) : forLearner(consistentId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Assignments that = (Assignments) o;
+
+        return assignments != null ? assignments.equals(that.assignments) : that.assignments == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return assignments != null ? assignments.hashCode() : 0;

Review Comment:
   How can it be null?



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());

Review Comment:
   For example, you're assuming that there can't be more than 65k partitions, which will not be true in the future. So we should use int for the header instead of short



##########
modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java:
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import static org.apache.ignite.internal.affinity.Assignment.forLearner;
+import static org.apache.ignite.internal.affinity.Assignment.forPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * Represents affinity assignments.
+ */
+public class Assignments {
+    private final List<Set<Assignment>> assignments;
+
+    public Assignments(List<Set<Assignment>> assignments) {
+        this.assignments = assignments;
+    }
+
+    public List<Set<Assignment>> assignments() {
+        return assignments;
+    }
+
+    /**
+     * Partitions count.
+     *
+     * @return Partitions count.
+     */
+    public int size() {
+        return assignments.size();
+    }
+
+    /**
+     * Gets an assignments for the given partition.
+     *
+     * @param part Partition.
+     * @return Assignments for the given partition.
+     */
+    public Set<Assignment> get(int part) {
+        return assignments.get(part);
+    }
+
+    /**
+     * Sets an assignment for the given partition.
+     *
+     * @param part Partition.
+     * @param assignment Assignments for the given partition.
+     */
+    public void set(int part, Set<Assignment> assignment) {
+        assignments.set(part, assignment);
+    }
+
+    /**
+     * Bytes representation of the assignments.
+     *
+     * @return Bytes representation of the assignments.
+     */
+    public byte[] bytes() {
+        Map<String, Integer> consistentIds = new LinkedHashMap<>();
+
+        int idx = 0;
+        for (Set<Assignment> assignmentSet : assignments) {
+            for (Assignment a : assignmentSet) {
+                if (consistentIds.putIfAbsent(a.consistentId(), idx) == null) {
+                    idx++;
+                }
+            }
+        }
+
+        byte[] consistentIdsBytes = collectionToBytes(consistentIds.keySet(), String::getBytes);
+
+        byte[] assignmentsBytes = collectionToBytes(
+                assignments,
+                set -> collectionToBytes(set, a -> assignmentToBytes(a, consistentIds::get))
+        );
+
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES * 2 + consistentIdsBytes.length + assignmentsBytes.length);
+
+        buf.putInt(consistentIdsBytes.length);
+        buf.put(consistentIdsBytes);
+        buf.putInt(assignmentsBytes.length);
+        buf.put(assignmentsBytes);
+
+        return buf.array();
+    }
+
+    /**
+     * Creates {@link Assignments} from the given byte array.
+     *
+     * @param bytes Byte array representation of the assignments.
+     * @return Assignments.
+     */
+    public static Assignments fromBytes(byte[] bytes) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        int consistentIdsBytesLength = buf.getInt();
+        byte[] consistentIdsBytes = new byte[consistentIdsBytesLength];
+        buf.get(consistentIdsBytes);
+
+        int assignmentsBytesLength = buf.getInt();
+        byte[] assignmentsBytes = new byte[assignmentsBytesLength];
+        buf.get(assignmentsBytes);
+
+        List<String> consistentIds = bytesToList(consistentIdsBytes, b -> new String(b, StandardCharsets.UTF_8));
+
+        List<Set<Assignment>> assignments = bytesToList(
+                assignmentsBytes,
+                b -> new HashSet<>(bytesToList(b, ab -> bytesToAssignment(ab, consistentIds::get)))
+        );
+
+        return new Assignments(assignments);
+    }
+
+    /**
+     * Serializes collection to bytes.
+     *
+     * @param collection Collection.
+     * @param transform Tranform function for the collection element.
+     * @return Byte array.
+     */
+    private <T> byte[] collectionToBytes(Collection<T> collection, Function<T, byte[]> transform) {
+        int bytesObjects = 0;
+        List<byte[]> objects = new ArrayList<>();
+
+        for (T o : collection) {
+            byte[] b = transform.apply(o);
+            objects.add(b);
+            bytesObjects += b.length;
+        }
+
+        bytesObjects += Short.BYTES * (objects.size() + 1);
+
+        ByteBuffer buf = ByteBuffer.allocate(bytesObjects);
+
+        buf.putShort((short) objects.size());
+
+        for (byte[] o : objects) {
+            buf.putShort((short) o.length);
+            buf.put(o);
+        }
+
+        return buf.array();
+    }
+
+    /**
+     * Deserializes the list from byte array.
+     *
+     * @param bytes Byte array.
+     * @param transform Transform function to create list element.
+     * @return List.
+     */
+    private static <T> List<T> bytesToList(byte[] bytes, Function<byte[], T> transform) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+
+        short length = buf.getShort();
+        assert length >= 0 : "Negative collection size: " + length;
+
+        List<T> result = new ArrayList<>(length);
+
+        for (int i = 0; i < length; i++) {
+            short size = buf.getShort();
+            assert size >= 0 : "Negative object size: " + size + ", index=" + i;
+            byte[] arr = new byte[size];
+            buf.get(arr);
+            result.add(transform.apply(arr));
+        }
+
+        return result;
+    }
+
+    private static byte[] assignmentToBytes(Assignment assignment, Function<String, Integer> consistentIdToIndex) {
+        ByteBuffer buf = ByteBuffer.allocate(Integer.BYTES + Byte.BYTES);
+        buf.putInt(consistentIdToIndex.apply(assignment.consistentId()));
+        buf.put((byte) (assignment.isPeer() ? 1 : 0));
+        return buf.array();
+    }
+
+    private static Assignment bytesToAssignment(byte[] bytes, Function<Integer, String> indexToConsistentId) {
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+        int index = buf.getInt();
+        boolean isPeer = buf.get() == 1;
+        String consistentId = indexToConsistentId.apply(index);
+        return isPeer ? forPeer(consistentId) : forLearner(consistentId);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        Assignments that = (Assignments) o;
+
+        return assignments != null ? assignments.equals(that.assignments) : that.assignments == null;

Review Comment:
   There's a method in `Objects` class that does null-checks. Anyway, the same question, how can it be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@ignite.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org