You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2021/10/05 18:39:22 UTC

[kafka] branch trunk updated: MINOR: TopicIdPartition improvements (#11374)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1a3e23a  MINOR: TopicIdPartition improvements (#11374)
1a3e23a is described below

commit 1a3e23a5798373fb40f31af2eb7c4348270ac437
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue Oct 5 11:37:38 2021 -0700

    MINOR: TopicIdPartition improvements (#11374)
    
    1. It should not require a TopicPartition during construction and normal
    usage.
    2. Simplify `equals` since `topicId` and `topicPartition` are never
    null.
    3. Inline `Objects.hash` to avoid array allocation.
    4. Make `toString` more concise using a similar approach as
    `TopicPartition` since this `TopicIdPartition` will replace
    `TopicPartition` in many places in the future.
    5. Add unit tests for `TopicIdPartition`, it seems like we had none.
    6. Minor clean-up in calling/called classes.
    
    Reviewers: David Jacot <dj...@confluent.io>, Satish Duggana <sa...@apache.org>
---
 .../org/apache/kafka/common/TopicIdPartition.java  | 35 +++++++++---
 .../org/apache/kafka/common/TopicPartition.java    |  3 +-
 .../apache/kafka/common/TopicIdPartitionTest.java  | 63 ++++++++++++++++++++++
 .../storage/RemoteLogMetadataTopicPartitioner.java |  2 +-
 .../RemoteLogSegmentMetadataTransform.java         |  4 +-
 .../RemoteLogSegmentMetadataUpdateTransform.java   |  4 +-
 .../RemotePartitionDeleteMetadataTransform.java    |  4 +-
 7 files changed, 99 insertions(+), 16 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
index 9fb570a..027648f 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java
@@ -32,6 +32,13 @@ public class TopicIdPartition {
         this.topicPartition = Objects.requireNonNull(topicPartition, "topicPartition can not be null");
     }
 
+    public TopicIdPartition(String topic, Uuid topicId, int partition) {
+        this.topicId = Objects.requireNonNull(topicId, "topicId can not be null");
+        this.topicPartition = new TopicPartition(
+            Objects.requireNonNull(topic, "topic can not be null"),
+            partition);
+    }
+
     /**
      * @return Universally unique id representing this topic partition.
      */
@@ -40,6 +47,20 @@ public class TopicIdPartition {
     }
 
     /**
+     * @return the topic name.
+     */
+    public String topic() {
+        return topicPartition.topic();
+    }
+
+    /**
+     * @return the partition id.
+     */
+    public int partition() {
+        return topicPartition.partition();
+    }
+
+    /**
      * @return Topic partition representing this instance.
      */
     public TopicPartition topicPartition() {
@@ -55,20 +76,20 @@ public class TopicIdPartition {
             return false;
         }
         TopicIdPartition that = (TopicIdPartition) o;
-        return Objects.equals(topicId, that.topicId) &&
-               Objects.equals(topicPartition, that.topicPartition);
+        return topicId.equals(that.topicId) &&
+               topicPartition.equals(that.topicPartition);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topicId, topicPartition);
+        final int prime = 31;
+        int result = prime + topicId.hashCode();
+        result = prime * result + topicPartition.hashCode();
+        return result;
     }
 
     @Override
     public String toString() {
-        return "TopicIdPartition{" +
-               "topicId=" + topicId +
-               ", topicPartition=" + topicPartition +
-               '}';
+        return topicId + ":" + topic() + "-" + partition();
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
index 2c6add7..7c8fe79 100644
--- a/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
+++ b/clients/src/main/java/org/apache/kafka/common/TopicPartition.java
@@ -47,8 +47,7 @@ public final class TopicPartition implements Serializable {
         if (hash != 0)
             return hash;
         final int prime = 31;
-        int result = 1;
-        result = prime * result + partition;
+        int result = prime + partition;
         result = prime * result + Objects.hashCode(topic);
         this.hash = result;
         return result;
diff --git a/clients/src/test/java/org/apache/kafka/common/TopicIdPartitionTest.java b/clients/src/test/java/org/apache/kafka/common/TopicIdPartitionTest.java
new file mode 100644
index 0000000..08de64f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/TopicIdPartitionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+import java.util.Objects;
+import org.junit.jupiter.api.Test;
+
+class TopicIdPartitionTest {
+
+    private final Uuid topicId0 = new Uuid(-4883993789924556279L, -5960309683534398572L);
+    private final String topicName0 = "a_topic_name";
+    private final int partition1 = 1;
+    private final TopicPartition topicPartition0 = new TopicPartition(topicName0, partition1);
+    private final TopicIdPartition topicIdPartition0 = new TopicIdPartition(topicId0, topicPartition0);
+    private final TopicIdPartition topicIdPartition1 = new TopicIdPartition(topicName0, topicId0,
+        partition1);
+
+    private final Uuid topicId1 = new Uuid(7759286116672424028L, -5081215629859775948L);
+    private final String topicName1 = "another_topic_name";
+    private final TopicIdPartition topicIdPartition2 = new TopicIdPartition(topicName1, topicId1,
+        partition1);
+
+    @Test
+    public void testEquals() {
+        assertEquals(topicIdPartition0, topicIdPartition1);
+        assertEquals(topicIdPartition1, topicIdPartition0);
+
+        assertNotEquals(topicIdPartition0, topicIdPartition2);
+        assertNotEquals(topicIdPartition2, topicIdPartition0);
+    }
+
+    @Test
+    public void testHashCode() {
+        assertEquals(Objects.hash(topicIdPartition0.topicId(), topicIdPartition0.topicPartition()),
+            topicIdPartition0.hashCode());
+        assertEquals(topicIdPartition0.hashCode(), topicIdPartition1.hashCode());
+        assertNotEquals(topicIdPartition0.hashCode(), topicIdPartition2.hashCode());
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("vDiRhkpVQgmtSLnsAZx7lA:a_topic_name-1", topicIdPartition0.toString());
+    }
+
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java
index 6622214..af12647 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java
@@ -43,7 +43,7 @@ public class RemoteLogMetadataTopicPartitioner {
         // We do not want to depend upon hash code generation of Uuid as that may change.
         int hash = Objects.hash(topicIdPartition.topicId().getLeastSignificantBits(),
                                 topicIdPartition.topicId().getMostSignificantBits(),
-                                topicIdPartition.topicPartition().partition());
+                                topicIdPartition.partition());
 
         return toBytes(hash);
     }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 375c533..4282b9e 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -60,8 +60,8 @@ public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTrans
                 .setTopicIdPartition(
                         new RemoteLogSegmentMetadataRecord.TopicIdPartitionEntry()
                                 .setId(data.remoteLogSegmentId().topicIdPartition().topicId())
-                                .setName(data.remoteLogSegmentId().topicIdPartition().topicPartition().topic())
-                                .setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition().partition()))
+                                .setName(data.remoteLogSegmentId().topicIdPartition().topic())
+                                .setPartition(data.remoteLogSegmentId().topicIdPartition().partition()))
                 .setId(data.remoteLogSegmentId().id());
     }
 
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
index 4ad8277..3db7765 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataUpdateTransform.java
@@ -51,8 +51,8 @@ public class RemoteLogSegmentMetadataUpdateTransform implements RemoteLogMetadat
                 .setId(data.remoteLogSegmentId().id())
                 .setTopicIdPartition(
                         new RemoteLogSegmentMetadataUpdateRecord.TopicIdPartitionEntry()
-                                .setName(data.remoteLogSegmentId().topicIdPartition().topicPartition().topic())
-                                .setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition().partition())
+                                .setName(data.remoteLogSegmentId().topicIdPartition().topic())
+                                .setPartition(data.remoteLogSegmentId().topicIdPartition().partition())
                                 .setId(data.remoteLogSegmentId().topicIdPartition().topicId()));
     }
 
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java
index 27f4f1f..d94830f 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemotePartitionDeleteMetadataTransform.java
@@ -37,8 +37,8 @@ public final class RemotePartitionDeleteMetadataTransform implements RemoteLogMe
 
     private RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry createTopicIdPartitionEntry(TopicIdPartition topicIdPartition) {
         return new RemotePartitionDeleteMetadataRecord.TopicIdPartitionEntry()
-                .setName(topicIdPartition.topicPartition().topic())
-                .setPartition(topicIdPartition.topicPartition().partition())
+                .setName(topicIdPartition.topic())
+                .setPartition(topicIdPartition.partition())
                 .setId(topicIdPartition.topicId());
     }