You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2023/02/16 15:13:58 UTC

[kafka] branch trunk updated: MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba0c5b0902d MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)
ba0c5b0902d is described below

commit ba0c5b0902d4b259505cf4a7c2a45e98182a372b
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Thu Feb 16 15:13:31 2023 +0000

    MINOR: Simplify JUnit assertions in tests; remove accidental unnecessary code in tests (#13219)
    
    * assertEquals called on array
    * Method is identical to its super method
    * Simplifiable assertions
    * Unused imports
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Divij Vaidya <di...@amazon.com>
---
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  3 +-
 .../consumer/ConsumerPartitionAssignorTest.java    | 22 ----------
 .../common/header/internals/RecordHeadersTest.java |  4 +-
 .../apache/kafka/common/network/SelectorTest.java  |  3 --
 .../utils/ImplicitLinkedHashCollectionTest.java    |  7 ++--
 .../ImplicitLinkedHashMultiCollectionTest.java     |  8 ++--
 .../auth/extension/JaasBasicAuthFilterTest.java    |  3 +-
 .../connect/integration/BlockingConnectorTest.java | 10 -----
 .../runtime/AbstractWorkerSourceTaskTest.java      |  9 ++--
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   |  4 +-
 .../runtime/rest/entities/PluginInfoTest.java      | 12 +++---
 .../controller/BrokerHeartbeatManagerTest.java     |  3 +-
 .../kafka/controller/QuorumControllerTest.java     |  4 +-
 .../controller/ReplicationControlManagerTest.java  |  4 +-
 .../apache/kafka/controller/ResultOrErrorTest.java | 20 +++++----
 .../org/apache/kafka/image/TopicsImageTest.java    |  5 ++-
 .../kafka/metadata/KafkaConfigSchemaTest.java      |  5 ++-
 .../authorizer/StandardAuthorizerTest.java         | 33 +++++++--------
 .../apache/kafka/metalog/LocalLogManagerTest.java  |  7 ++--
 .../apache/kafka/timeline/BaseHashTableTest.java   | 25 +++++------
 .../kafka/timeline/SnapshottableHashTableTest.java | 38 +++++++++--------
 .../apache/kafka/timeline/TimelineHashMapTest.java | 14 ++++---
 .../org/apache/kafka/shell/GlobComponentTest.java  |  7 ++--
 .../org/apache/kafka/shell/MetadataNodeTest.java   |  6 +--
 .../storage/RemoteLogSegmentLifecycleTest.java     |  4 --
 .../org/apache/kafka/streams/KeyValueTest.java     | 48 +++++++++++-----------
 .../integration/StoreUpgradeIntegrationTest.java   |  8 ----
 .../internals/KStreamNewProcessorApiTest.java      |  4 --
 .../internals/graph/GraphGraceSearchUtilTest.java  |  9 ----
 .../internals/graph/TableProcessorNodeTest.java    |  7 ----
 .../CachingInMemoryKeyValueStoreTest.java          |  6 ---
 .../MeteredTimestampedKeyValueStoreTest.java       |  3 +-
 .../streams/state/internals/ThreadCacheTest.java   |  4 +-
 .../internals/TimeOrderedWindowStoreTest.java      |  3 --
 .../kafka/streams/TopologyTestDriverTest.java      | 17 ++++----
 35 files changed, 155 insertions(+), 214 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d050f08e320..bad3e391bfa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -229,7 +229,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
@@ -5725,7 +5724,7 @@ public class KafkaAdminClientTest {
     }
 
     private ClientQuotaEntity newClientQuotaEntity(String... args) {
-        assertTrue(args.length % 2 == 0);
+        assertEquals(0, args.length % 2);
 
         Map<String, String> entityMap = new HashMap<>(args.length / 2);
         for (int index = 0; index < args.length; index += 2) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
index 1298f8c23bd..635f38f7f38 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignorTest.java
@@ -22,12 +22,10 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.junit.jupiter.api.Test;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
@@ -112,31 +110,11 @@ public class ConsumerPartitionAssignorTest {
 
     public static class TestConsumerPartitionAssignor implements ConsumerPartitionAssignor {
 
-        @Override
-        public ByteBuffer subscriptionUserData(Set<String> topics) {
-            return ConsumerPartitionAssignor.super.subscriptionUserData(topics);
-        }
-
         @Override
         public GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription) {
             return null;
         }
 
-        @Override
-        public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
-            ConsumerPartitionAssignor.super.onAssignment(assignment, metadata);
-        }
-
-        @Override
-        public List<RebalanceProtocol> supportedProtocols() {
-            return ConsumerPartitionAssignor.super.supportedProtocols();
-        }
-
-        @Override
-        public short version() {
-            return ConsumerPartitionAssignor.super.version();
-        }
-
         @Override
         public String name() {
             // use the RangeAssignor's name to cause naming conflict
diff --git a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
index e6c66fcdba4..0791eca8e42 100644
--- a/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/header/internals/RecordHeadersTest.java
@@ -21,9 +21,9 @@ import org.apache.kafka.common.header.Headers;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Iterator;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -230,7 +230,7 @@ public class RecordHeadersTest {
     
     static void assertHeader(String key, String value, Header actual) {
         assertEquals(key, actual.key());
-        assertTrue(Arrays.equals(value.getBytes(), actual.value()));
+        assertArrayEquals(value.getBytes(), actual.value());
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 618e86e1dae..988aac59803 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -430,9 +430,6 @@ public class SelectorTest {
                     MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
                 throw new RuntimeException("Test exception");
             }
-            @Override
-            public void close() {
-            }
         };
         Selector selector = new Selector(CONNECTION_MAX_IDLE_MS, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
         SocketChannel socketChannel = SocketChannel.open();
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
index d62595a2ae4..0c5828f565a 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashCollectionTest.java
@@ -31,6 +31,7 @@ import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -125,7 +126,7 @@ public class ImplicitLinkedHashCollectionTest {
         assertTrue(coll.contains(new TestElement(1)));
         assertFalse(coll.contains(new TestElement(4)));
         TestElement secondAgain = coll.find(new TestElement(2));
-        assertTrue(second == secondAgain);
+        assertSame(second, secondAgain);
         assertTrue(coll.remove(new TestElement(1)));
         assertFalse(coll.remove(new TestElement(1)));
         assertEquals(2, coll.size());
@@ -142,8 +143,8 @@ public class ImplicitLinkedHashCollectionTest {
             assertEquals(sequence[i].intValue(), element.key, "Iterator value number " + (i + 1) + " was incorrect.");
             i = i + 1;
         }
-        assertTrue(i == sequence.length, "Iterator yieled " + (i + 1) + " elements, but " +
-            sequence.length + " were expected.");
+        assertEquals(sequence.length, i, "Iterator yieled " + (i + 1) + " elements, but " +
+                sequence.length + " were expected.");
     }
 
     static void expectTraversal(Iterator<TestElement> iter, Iterator<Integer> expectedIter) {
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
index 9b3df922461..2d3c4b519a1 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ImplicitLinkedHashMultiCollectionTest.java
@@ -27,6 +27,7 @@ import java.util.Random;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -97,8 +98,8 @@ public class ImplicitLinkedHashMultiCollectionTest {
             }
             i = i + 1;
         }
-        assertTrue(i == sequence.length, "Iterator yieled " + (i + 1) + " elements, but " +
-            sequence.length + " were expected.");
+        assertEquals(sequence.length, i, "Iterator yieled " + (i + 1) + " elements, but " +
+                sequence.length + " were expected.");
     }
 
     @Test
@@ -160,8 +161,7 @@ public class ImplicitLinkedHashMultiCollectionTest {
             assertTrue(expectedIter.hasNext(),
                 "Iterator yieled " + (i + 1) + " elements, but only " + i + " were expected.");
             TestElement expected = expectedIter.next();
-            assertTrue(expected == element,
-                "Iterator value number " + (i + 1) + " was incorrect.");
+            assertSame(expected, element, "Iterator value number " + (i + 1) + " was incorrect.");
             i = i + 1;
         }
         assertFalse(expectedIter.hasNext(),
diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
index bd5e060f1e9..826a41fafe4 100644
--- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
+++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java
@@ -47,6 +47,7 @@ import javax.ws.rs.core.Response;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
@@ -230,7 +231,7 @@ public class JaasBasicAuthFilterTest {
         ArgumentCaptor<SecurityContext> capturedContext = ArgumentCaptor.forClass(SecurityContext.class);
         verify(requestContext).setSecurityContext(capturedContext.capture());
         assertEquals("user1", capturedContext.getValue().getUserPrincipal().getName());
-        assertEquals(true, capturedContext.getValue().isSecure());
+        assertTrue(capturedContext.getValue().isSecure());
     }
 
     private String authHeader(String authorization, String username, String password) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index ebb604b2a50..33614e317c4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -575,11 +575,6 @@ public class BlockingConnectorTest {
         public void stop() {
         }
 
-        @Override
-        public Config validate(Map<String, String> connectorConfigs) {
-            return super.validate(connectorConfigs);
-        }
-
         @Override
         public ConfigDef config() {
             return Block.config();
@@ -700,11 +695,6 @@ public class BlockingConnectorTest {
         public void stop() {
         }
 
-        @Override
-        public Config validate(Map<String, String> connectorConfigs) {
-            return super.validate(connectorConfigs);
-        }
-
         @Override
         public ConfigDef config() {
             return Block.config();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
index f2f63264e36..e95a794739f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java
@@ -91,6 +91,7 @@ import static org.apache.kafka.connect.runtime.TopicCreationConfig.INCLUDE_REGEX
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_CONFIG;
 import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_CREATION_ENABLE_CONFIG;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -247,8 +248,8 @@ public class AbstractWorkerSourceTaskTest {
 
         workerTask.toSend = records;
         workerTask.sendRecords();
-        assertEquals(SERIALIZED_KEY, sent.getValue().key());
-        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+        assertArrayEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value());
 
         PowerMock.verifyAll();
     }
@@ -339,8 +340,8 @@ public class AbstractWorkerSourceTaskTest {
 
         workerTask.toSend = records;
         workerTask.sendRecords();
-        assertEquals(SERIALIZED_KEY, sent.getValue().key());
-        assertEquals(SERIALIZED_RECORD, sent.getValue().value());
+        assertArrayEquals(SERIALIZED_KEY, sent.getValue().key());
+        assertArrayEquals(SERIALIZED_RECORD, sent.getValue().value());
         assertEquals(headers, sent.getValue().headers());
 
         PowerMock.verifyAll();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index 632f2d8f15f..1257834252e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -1072,7 +1072,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
             assertEquals(RECORDS.size(), metrics.currentMetricValueAsDouble(taskGroup, "batch-size-avg"), 0.000001d);
             assertTrue(pollRate > 0.0d);
         } else {
-            assertTrue(pollRate == 0.0d);
+            assertEquals(0.0d, pollRate, 0.0);
         }
         assertTrue(pollTotal >= minimumPollCountExpected);
 
@@ -1081,7 +1081,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         if (minimumPollCountExpected > 0) {
             assertTrue(writeRate > 0.0d);
         } else {
-            assertTrue(writeRate == 0.0d);
+            assertEquals(0.0d, writeRate, 0.0);
         }
         assertTrue(writeTotal >= minimumPollCountExpected);
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
index 540c8a42694..e4f265ed4d7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/entities/PluginInfoTest.java
@@ -19,17 +19,17 @@ package org.apache.kafka.connect.runtime.rest.entities;
 import org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader;
 import org.junit.Test;
 
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
 public class PluginInfoTest {
 
     @Test
     public void testNoVersionFilter() {
         PluginInfo.NoVersionFilter filter = new PluginInfo.NoVersionFilter();
-        assertFalse(filter.equals("1.0"));
-        assertFalse(filter.equals(new Object()));
-        assertFalse(filter.equals(null));
-        assertTrue(filter.equals(DelegatingClassLoader.UNDEFINED_VERSION));
+        assertNotEquals("1.0", filter);
+        assertNotEquals(filter, new Object());
+        assertNotEquals(null, filter);
+        assertEquals(DelegatingClassLoader.UNDEFINED_VERSION, filter);
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
index d325e4f88cc..3f93ac52a89 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/BrokerHeartbeatManagerTest.java
@@ -41,6 +41,7 @@ import static org.apache.kafka.controller.BrokerControlState.SHUTDOWN_NOW;
 import static org.apache.kafka.controller.BrokerControlState.UNFENCED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -239,7 +240,7 @@ public class BrokerHeartbeatManagerTest {
     @Test
     public void testBrokerHeartbeatStateList() {
         BrokerHeartbeatStateList list = new BrokerHeartbeatStateList();
-        assertEquals(null, list.first());
+        assertNull(list.first());
         BrokerHeartbeatStateIterator iterator = list.iterator();
         assertFalse(iterator.hasNext());
         BrokerHeartbeatState broker0 = new BrokerHeartbeatState(0);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index db423340967..f85b1f921da 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -114,6 +114,7 @@ import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
 import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
 import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -314,8 +315,7 @@ public class QuorumControllerTest {
             int[] expectedIsr = {1};
             int[] isrFoo = active.replicationControl().getPartition(topicIdFoo, 0).isr;
 
-            assertTrue(Arrays.equals(isrFoo, expectedIsr),
-                "The ISR for topic foo was " + Arrays.toString(isrFoo) +
+            assertArrayEquals(isrFoo, expectedIsr, "The ISR for topic foo was " + Arrays.toString(isrFoo) +
                     ". It is expected to be " + Arrays.toString(expectedIsr));
 
             int fooLeader = active.replicationControl().getPartition(topicIdFoo, 0).leader;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 2d46b6ec46f..76f8ffb4548 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -242,7 +242,7 @@ public class ReplicationControlManagerTest {
         CreatableTopicResult createTestTopic(String name, int[][] replicas,
                                              Map<String, String> configs,
                                              short expectedErrorCode) throws Exception {
-            assertFalse(replicas.length == 0);
+            assertNotEquals(0, replicas.length);
             CreateTopicsRequestData request = new CreateTopicsRequestData();
             CreatableTopic topic = new CreatableTopic().setName(name);
             topic.setNumPartitions(-1).setReplicationFactor((short) -1);
@@ -284,7 +284,7 @@ public class ReplicationControlManagerTest {
 
         void createPartitions(int count, String name,
                 int[][] replicas, short expectedErrorCode) throws Exception {
-            assertFalse(replicas.length == 0);
+            assertNotEquals(0, replicas.length);
             CreatePartitionsTopic topic = new CreatePartitionsTopic().
                 setName(name).
                 setCount(count);
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ResultOrErrorTest.java b/metadata/src/test/java/org/apache/kafka/controller/ResultOrErrorTest.java
index 7d42b2edafb..c4c7ae4617f 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ResultOrErrorTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ResultOrErrorTest.java
@@ -24,6 +24,8 @@ import org.junit.jupiter.api.Timeout;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
@@ -35,7 +37,7 @@ public class ResultOrErrorTest {
             new ResultOrError<>(Errors.INVALID_REQUEST, "missing foobar");
         assertTrue(resultOrError.isError());
         assertFalse(resultOrError.isResult());
-        assertEquals(null, resultOrError.result());
+        assertNull(resultOrError.result());
         assertEquals(new ApiError(Errors.INVALID_REQUEST, "missing foobar"),
             resultOrError.error());
     }
@@ -46,20 +48,20 @@ public class ResultOrErrorTest {
         assertFalse(resultOrError.isError());
         assertTrue(resultOrError.isResult());
         assertEquals(123, resultOrError.result());
-        assertEquals(null, resultOrError.error());
+        assertNull(resultOrError.error());
     }
 
     @Test
     public void testEquals() {
         ResultOrError<String> a = new ResultOrError<>(Errors.INVALID_REQUEST, "missing foobar");
         ResultOrError<String> b = new ResultOrError<>("abcd");
-        assertFalse(a.equals(b));
-        assertFalse(b.equals(a));
-        assertTrue(a.equals(a));
-        assertTrue(b.equals(b));
+        assertNotEquals(a, b);
+        assertNotEquals(b, a);
+        assertEquals(a, a);
+        assertEquals(b, b);
         ResultOrError<String> c = new ResultOrError<>(Errors.INVALID_REQUEST, "missing baz");
-        assertFalse(a.equals(c));
-        assertFalse(c.equals(a));
-        assertTrue(c.equals(c));
+        assertNotEquals(a, c);
+        assertNotEquals(c, a);
+        assertEquals(c, c);
     }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
index 6ae64e6daa5..8268ec86091 100644
--- a/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
@@ -48,6 +48,7 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.REMOVE_TOPIC_R
 import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -396,7 +397,7 @@ public class TopicsImageTest {
         assertTrue(map.containsKey("bar"));
         assertEquals(BAR_UUID, map.get("bar"));
         assertFalse(map.containsKey("baz"));
-        assertEquals(null, map.get("baz"));
+        assertNull(map.get("baz"));
         HashSet<Uuid> uuids = new HashSet<>();
         map.values().iterator().forEachRemaining(u -> uuids.add(u));
         HashSet<Uuid> expectedUuids = new HashSet<>(Arrays.asList(
@@ -415,7 +416,7 @@ public class TopicsImageTest {
         assertTrue(map.containsKey(BAR_UUID));
         assertEquals("bar", map.get(BAR_UUID));
         assertFalse(map.containsKey(BAZ_UUID));
-        assertEquals(null, map.get(BAZ_UUID));
+        assertNull(map.get(BAZ_UUID));
         HashSet<String> names = new HashSet<>();
         map.values().iterator().forEachRemaining(n -> names.add(n));
         HashSet<String> expectedNames = new HashSet<>(Arrays.asList("foo", "bar"));
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java
index 2f870057e1d..55a5b3c2702 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/KafkaConfigSchemaTest.java
@@ -36,6 +36,7 @@ import static org.apache.kafka.common.config.ConfigResource.Type.TOPIC;
 import static org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
@@ -118,8 +119,8 @@ public class KafkaConfigSchemaTest {
     @Test
     public void testGetConfigValueDefault() {
         assertEquals("1", SCHEMA.getDefault(BROKER, "foo.bar"));
-        assertEquals(null, SCHEMA.getDefault(BROKER, "foo.baz.quux"));
-        assertEquals(null, SCHEMA.getDefault(TOPIC, "abc"));
+        assertNull(SCHEMA.getDefault(BROKER, "foo.baz.quux"));
+        assertNull(SCHEMA.getDefault(TOPIC, "abc"));
         assertEquals("true", SCHEMA.getDefault(TOPIC, "ghi"));
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index 3966054e744..b11b528fb9f 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -87,6 +87,7 @@ import static org.apache.kafka.server.authorizer.AuthorizationResult.ALLOWED;
 import static org.apache.kafka.server.authorizer.AuthorizationResult.DENIED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -280,16 +281,16 @@ public class StandardAuthorizerTest {
                 newFooAcl(op, ALLOW)));
         }
         // CREATE does not imply DESCRIBE
-        assertEquals(null, findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
-            new MockAuthorizableRequestContext.Builder().
-                setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
-            newFooAcl(CREATE, ALLOW)));
+        assertNull(findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
+                new MockAuthorizableRequestContext.Builder().
+                        setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
+                newFooAcl(CREATE, ALLOW)));
         // Deny ACLs don't do "implication".
         for (AclOperation op : asList(READ, WRITE, DELETE, ALTER)) {
-            assertEquals(null, findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
-                new MockAuthorizableRequestContext.Builder().
-                    setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
-                newFooAcl(op, DENY)));
+            assertNull(findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
+                    new MockAuthorizableRequestContext.Builder().
+                            setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
+                    newFooAcl(op, DENY)));
         }
         // Exact match
         assertEquals(DENIED, findResult(newAction(DESCRIBE, TOPIC, "foo_bar"),
@@ -304,10 +305,10 @@ public class StandardAuthorizerTest {
                 newFooAcl(op, ALLOW)));
         }
         // Deny ACLs don't do "implication".
-        assertEquals(null, findResult(newAction(DESCRIBE_CONFIGS, TOPIC, "foo_bar"),
-            new MockAuthorizableRequestContext.Builder().
-                setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
-            newFooAcl(ALTER_CONFIGS, DENY)));
+        assertNull(findResult(newAction(DESCRIBE_CONFIGS, TOPIC, "foo_bar"),
+                new MockAuthorizableRequestContext.Builder().
+                        setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
+                newFooAcl(ALTER_CONFIGS, DENY)));
         // Exact match
         assertEquals(DENIED, findResult(newAction(ALTER_CONFIGS, TOPIC, "foo_bar"),
             new MockAuthorizableRequestContext.Builder().
@@ -333,10 +334,10 @@ public class StandardAuthorizerTest {
                 setPrincipal(new KafkaPrincipal(USER_TYPE, "bob")).build(),
             newFooAcl(READ, ALLOW)));
         // Principal does not match.
-        assertEquals(null, findResult(newAction(READ, TOPIC, "foo_bar"),
-            new MockAuthorizableRequestContext.Builder().
-                setPrincipal(new KafkaPrincipal(USER_TYPE, "alice")).build(),
-            newFooAcl(READ, ALLOW)));
+        assertNull(findResult(newAction(READ, TOPIC, "foo_bar"),
+                new MockAuthorizableRequestContext.Builder().
+                        setPrincipal(new KafkaPrincipal(USER_TYPE, "alice")).build(),
+                newFooAcl(READ, ALLOW)));
         // Wildcard principal matches anything.
         assertEquals(DENIED, findResult(newAction(READ, GROUP, "bar"),
             new MockAuthorizableRequestContext.Builder().
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
index 4e6b29fd6de..dfa093478f1 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTest.java
@@ -33,6 +33,7 @@ import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
 import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
 import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 
 @Timeout(value = 40)
@@ -48,7 +49,7 @@ public class LocalLogManagerTest {
                 buildWithMockListeners()
         ) {
             env.close();
-            assertEquals(null, env.firstError.get());
+            assertNull(env.firstError.get());
         }
     }
 
@@ -63,7 +64,7 @@ public class LocalLogManagerTest {
         ) {
             assertEquals(new LeaderAndEpoch(OptionalInt.of(0), 1), env.waitForLeader());
             env.close();
-            assertEquals(null, env.firstError.get());
+            assertNull(env.firstError.get());
         }
     }
 
@@ -95,7 +96,7 @@ public class LocalLogManagerTest {
                 cur = next;
             } while (cur.leaderId().equals(first.leaderId()));
             env.close();
-            assertEquals(null, env.firstError.get());
+            assertNull(env.firstError.get());
         }
     }
 
diff --git a/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java
index a73357c5234..8d70ae74198 100644
--- a/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java
+++ b/server-common/src/test/java/org/apache/kafka/timeline/BaseHashTableTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.timeline;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.HashSet;
@@ -32,7 +33,7 @@ public class BaseHashTableTest {
     public void testEmptyTable() {
         BaseHashTable<Integer> table = new BaseHashTable<>(0);
         assertEquals(0, table.baseSize());
-        assertEquals(null, table.baseGet(Integer.valueOf(1)));
+        assertNull(table.baseGet(Integer.valueOf(1)));
     }
 
     @Test
@@ -57,17 +58,17 @@ public class BaseHashTableTest {
         Integer two = Integer.valueOf(2);
         Integer three = Integer.valueOf(3);
         Integer four = Integer.valueOf(4);
-        assertEquals(null, table.baseAddOrReplace(one));
-        assertEquals(null, table.baseAddOrReplace(two));
-        assertEquals(null, table.baseAddOrReplace(three));
+        assertNull(table.baseAddOrReplace(one));
+        assertNull(table.baseAddOrReplace(two));
+        assertNull(table.baseAddOrReplace(three));
         assertEquals(3, table.baseSize());
         assertEquals(one, table.baseGet(one));
         assertEquals(two, table.baseGet(two));
         assertEquals(three, table.baseGet(three));
-        assertEquals(null, table.baseGet(four));
+        assertNull(table.baseGet(four));
         assertEquals(one, table.baseRemove(one));
         assertEquals(2, table.baseSize());
-        assertEquals(null, table.baseGet(one));
+        assertNull(table.baseGet(one));
         assertEquals(2, table.baseSize());
     }
 
@@ -90,18 +91,18 @@ public class BaseHashTableTest {
         Foo three = new Foo();
         Foo four = new Foo();
         BaseHashTable<Foo> table = new BaseHashTable<>(20);
-        assertEquals(null, table.baseAddOrReplace(one));
-        assertEquals(null, table.baseAddOrReplace(two));
-        assertEquals(null, table.baseAddOrReplace(three));
+        assertNull(table.baseAddOrReplace(one));
+        assertNull(table.baseAddOrReplace(two));
+        assertNull(table.baseAddOrReplace(three));
         assertEquals(3, table.baseSize());
         assertEquals(one, table.baseGet(one));
         assertEquals(two, table.baseGet(two));
         assertEquals(three, table.baseGet(three));
-        assertEquals(null, table.baseGet(four));
+        assertNull(table.baseGet(four));
         assertEquals(one, table.baseRemove(one));
         assertEquals(three, table.baseRemove(three));
         assertEquals(1, table.baseSize());
-        assertEquals(null, table.baseGet(four));
+        assertNull(table.baseGet(four));
         assertEquals(two, table.baseGet(two));
         assertEquals(two, table.baseRemove(two));
         assertEquals(0, table.baseSize());
@@ -113,7 +114,7 @@ public class BaseHashTableTest {
 
         for (int i = 0; i < 4096; i++) {
             assertEquals(i, table.baseSize());
-            assertEquals(null, table.baseAddOrReplace(Integer.valueOf(i)));
+            assertNull(table.baseAddOrReplace(Integer.valueOf(i)));
         }
 
         for (int i = 0; i < 4096; i++) {
diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index cbe413d6b54..2dca4e2d3ae 100644
--- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -30,6 +30,8 @@ import org.junit.jupiter.api.Timeout;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -122,14 +124,14 @@ public class SnapshottableHashTableTest {
         SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
         SnapshottableHashTable<TestElement> table =
             new SnapshottableHashTable<>(registry, 1);
-        assertTrue(null == table.snapshottableAddOrReplace(E_1B));
+        assertNull(table.snapshottableAddOrReplace(E_1B));
         assertEquals(1, table.snapshottableSize(Long.MAX_VALUE));
         registry.getOrCreateSnapshot(0);
-        assertTrue(E_1B == table.snapshottableAddOrReplace(E_1A));
-        assertTrue(E_1B == table.snapshottableGet(E_1A, 0));
-        assertTrue(E_1A == table.snapshottableGet(E_1A, Long.MAX_VALUE));
-        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertSame(E_1B, table.snapshottableAddOrReplace(E_1A));
+        assertSame(E_1B, table.snapshottableGet(E_1A, 0));
+        assertSame(E_1A, table.snapshottableGet(E_1A, Long.MAX_VALUE));
+        assertNull(table.snapshottableAddOrReplace(E_2A));
+        assertNull(table.snapshottableAddOrReplace(E_3A));
         assertEquals(3, table.snapshottableSize(Long.MAX_VALUE));
         assertEquals(1, table.snapshottableSize(0));
         registry.getOrCreateSnapshot(1);
@@ -160,7 +162,7 @@ public class SnapshottableHashTableTest {
         assertIteratorYields(table.snapshottableIterator(0), E_1B, E_2A, E_3A);
         assertEquals(E_1B, table.snapshottableRemove(E_1B));
         assertIteratorYields(table.snapshottableIterator(0), E_1B, E_2A, E_3A);
-        assertEquals(null, table.snapshottableRemove(E_1A));
+        assertNull(table.snapshottableRemove(E_1A));
         assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE), E_2A, E_3A);
         assertEquals(E_2A, table.snapshottableRemove(E_2A));
         assertEquals(E_3A, table.snapshottableRemove(E_3A));
@@ -172,7 +174,7 @@ public class SnapshottableHashTableTest {
         SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
         SnapshottableHashTable<TestElement> table =
             new SnapshottableHashTable<>(registry, 1);
-        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
+        assertNull(table.snapshottableAddOrReplace(E_1A));
         registry.getOrCreateSnapshot(0);
         Iterator<TestElement> iter = table.snapshottableIterator(0);
         assertTrue(table.snapshottableAddUnlessPresent(E_2A));
@@ -185,11 +187,11 @@ public class SnapshottableHashTableTest {
         SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
         SnapshottableHashTable<TestElement> table =
             new SnapshottableHashTable<>(registry, 1);
-        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertNull(table.snapshottableAddOrReplace(E_1A));
+        assertNull(table.snapshottableAddOrReplace(E_2A));
+        assertNull(table.snapshottableAddOrReplace(E_3A));
         assertEquals(E_1A, table.snapshottableRemove(E_1A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_1B));
+        assertNull(table.snapshottableAddOrReplace(E_1B));
         registry.getOrCreateSnapshot(0);
         Iterator<TestElement> iter = table.snapshottableIterator(0);
         List<TestElement> iterElements = new ArrayList<>();
@@ -208,9 +210,9 @@ public class SnapshottableHashTableTest {
         SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
         SnapshottableHashTable<TestElement> table =
             new SnapshottableHashTable<>(registry, 1);
-        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertNull(table.snapshottableAddOrReplace(E_1A));
+        assertNull(table.snapshottableAddOrReplace(E_2A));
+        assertNull(table.snapshottableAddOrReplace(E_3A));
         registry.getOrCreateSnapshot(0);
         assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
         assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
@@ -232,9 +234,9 @@ public class SnapshottableHashTableTest {
         SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
         SnapshottableHashTable<TestElement> table =
             new SnapshottableHashTable<>(registry, 1);
-        assertEquals(null, table.snapshottableAddOrReplace(E_1A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_2A));
-        assertEquals(null, table.snapshottableAddOrReplace(E_3A));
+        assertNull(table.snapshottableAddOrReplace(E_1A));
+        assertNull(table.snapshottableAddOrReplace(E_2A));
+        assertNull(table.snapshottableAddOrReplace(E_3A));
         registry.getOrCreateSnapshot(0);
         assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
         assertEquals(E_3A, table.snapshottableAddOrReplace(E_3B));
diff --git a/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java b/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
index afffd3dda4e..65413ab5cf4 100644
--- a/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
+++ b/server-common/src/test/java/org/apache/kafka/timeline/TimelineHashMapTest.java
@@ -31,6 +31,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -91,7 +93,7 @@ public class TimelineHashMapTest {
     public void testMapMethods() {
         SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
         TimelineHashMap<Integer, String> map = new TimelineHashMap<>(registry, 1);
-        assertEquals(null, map.putIfAbsent(1, "xyz"));
+        assertNull(map.putIfAbsent(1, "xyz"));
         assertEquals("xyz", map.putIfAbsent(1, "123"));
         assertEquals("xyz", map.putIfAbsent(1, "ghi"));
         map.putAll(Collections.singletonMap(2, "b"));
@@ -104,12 +106,12 @@ public class TimelineHashMapTest {
     public void testMapEquals() {
         SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
         TimelineHashMap<Integer, String> map1 = new TimelineHashMap<>(registry, 1);
-        assertEquals(null, map1.putIfAbsent(1, "xyz"));
-        assertEquals(null, map1.putIfAbsent(2, "abc"));
+        assertNull(map1.putIfAbsent(1, "xyz"));
+        assertNull(map1.putIfAbsent(2, "abc"));
         TimelineHashMap<Integer, String> map2 = new TimelineHashMap<>(registry, 1);
-        assertEquals(null, map2.putIfAbsent(1, "xyz"));
-        assertFalse(map1.equals(map2));
-        assertEquals(null, map2.putIfAbsent(2, "abc"));
+        assertNull(map2.putIfAbsent(1, "xyz"));
+        assertNotEquals(map1, map2);
+        assertNull(map2.putIfAbsent(2, "abc"));
         assertEquals(map1, map2);
     }
 }
diff --git a/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
index da3a7ec1081..8d3e5e3ce7e 100644
--- a/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/GlobComponentTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.shell;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.junit.jupiter.api.Test;
@@ -44,9 +45,9 @@ public class GlobComponentTest {
 
     @Test
     public void testToRegularExpression() {
-        assertEquals(null, GlobComponent.toRegularExpression("blah"));
-        assertEquals(null, GlobComponent.toRegularExpression(""));
-        assertEquals(null, GlobComponent.toRegularExpression("does not need a regex, actually"));
+        assertNull(GlobComponent.toRegularExpression("blah"));
+        assertNull(GlobComponent.toRegularExpression(""));
+        assertNull(GlobComponent.toRegularExpression("does not need a regex, actually"));
         assertEquals("^\\$blah.*$", GlobComponent.toRegularExpression("$blah*"));
         assertEquals("^.*$", GlobComponent.toRegularExpression("*"));
         assertEquals("^foo(?:(?:bar)|(?:baz))$", GlobComponent.toRegularExpression("foo{bar,baz}"));
diff --git a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
index 42223c78c80..72c0f7d21d3 100644
--- a/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
+++ b/shell/src/test/java/org/apache/kafka/shell/MetadataNodeTest.java
@@ -19,8 +19,8 @@ package org.apache.kafka.shell;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import org.apache.kafka.shell.MetadataNode.DirectoryNode;
 import org.apache.kafka.shell.MetadataNode.FileNode;
@@ -38,9 +38,9 @@ public class MetadataNodeTest {
         DirectoryNode root = new DirectoryNode();
         DirectoryNode defNode = root.mkdirs("abc", "def");
         DirectoryNode defNode2 = root.mkdirs("abc", "def");
-        assertTrue(defNode == defNode2);
+        assertSame(defNode, defNode2);
         DirectoryNode defNode3 = root.directory("abc", "def");
-        assertTrue(defNode == defNode3);
+        assertSame(defNode, defNode3);
         root.mkdirs("ghi");
         assertEquals(new HashSet<>(Arrays.asList("abc", "ghi")), root.children().keySet());
         assertEquals(Collections.singleton("def"), root.mkdirs("abc").children().keySet());
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index cecf23c9f8d..b8af14e319e 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -477,10 +477,6 @@ public class RemoteLogSegmentLifecycleTest {
             tearDown();
         }
 
-        @Override
-        public int brokerCount() {
-            return 3;
-        }
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java b/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
index 01e09746b0d..3197860d188 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTest.java
@@ -20,8 +20,8 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 
 public class KeyValueTest {
     @Rule
@@ -33,44 +33,44 @@ public class KeyValueTest {
         final KeyValue<String, Long> copyOfKV = KeyValue.pair(kv.key, kv.value);
 
         // Reflexive
-        assertTrue(kv.equals(kv));
-        assertTrue(kv.hashCode() == kv.hashCode());
+        assertEquals(kv, kv);
+        assertEquals(kv.hashCode(), kv.hashCode());
 
         // Symmetric
-        assertTrue(kv.equals(copyOfKV));
-        assertTrue(kv.hashCode() == copyOfKV.hashCode());
-        assertTrue(copyOfKV.hashCode() == kv.hashCode());
+        assertEquals(kv, copyOfKV);
+        assertEquals(kv.hashCode(), copyOfKV.hashCode());
+        assertEquals(copyOfKV.hashCode(), kv.hashCode());
 
         // Transitive
         final KeyValue<String, Long> copyOfCopyOfKV = KeyValue.pair(copyOfKV.key, copyOfKV.value);
-        assertTrue(copyOfKV.equals(copyOfCopyOfKV));
-        assertTrue(copyOfKV.hashCode() == copyOfCopyOfKV.hashCode());
-        assertTrue(kv.equals(copyOfCopyOfKV));
-        assertTrue(kv.hashCode() == copyOfCopyOfKV.hashCode());
+        assertEquals(copyOfKV, copyOfCopyOfKV);
+        assertEquals(copyOfKV.hashCode(), copyOfCopyOfKV.hashCode());
+        assertEquals(kv, copyOfCopyOfKV);
+        assertEquals(kv.hashCode(), copyOfCopyOfKV.hashCode());
 
         // Inequality scenarios
-        assertFalse("must be false for null", kv.equals(null));
-        assertFalse("must be false if key is non-null and other key is null", kv.equals(KeyValue.pair(null, kv.value)));
-        assertFalse("must be false if value is non-null and other value is null", kv.equals(KeyValue.pair(kv.key, null)));
+        assertNotEquals("must be false for null", null, kv);
+        assertNotEquals("must be false if key is non-null and other key is null", kv, KeyValue.pair(null, kv.value));
+        assertNotEquals("must be false if value is non-null and other value is null", kv, KeyValue.pair(kv.key, null));
         final KeyValue<Long, Long> differentKeyType = KeyValue.pair(1L, kv.value);
-        assertFalse("must be false for different key types", kv.equals(differentKeyType));
+        assertNotEquals("must be false for different key types", kv, differentKeyType);
         final KeyValue<String, String> differentValueType = KeyValue.pair(kv.key, "anyString");
-        assertFalse("must be false for different value types", kv.equals(differentValueType));
+        assertNotEquals("must be false for different value types", kv, differentValueType);
         final KeyValue<Long, String> differentKeyValueTypes = KeyValue.pair(1L, "anyString");
-        assertFalse("must be false for different key and value types", kv.equals(differentKeyValueTypes));
-        assertFalse("must be false for different types of objects", kv.equals(new Object()));
+        assertNotEquals("must be false for different key and value types", kv, differentKeyValueTypes);
+        assertNotEquals("must be false for different types of objects", kv, new Object());
 
         final KeyValue<String, Long> differentKey = KeyValue.pair(kv.key + "suffix", kv.value);
-        assertFalse("must be false if key is different", kv.equals(differentKey));
-        assertFalse("must be false if key is different", differentKey.equals(kv));
+        assertNotEquals("must be false if key is different", kv, differentKey);
+        assertNotEquals("must be false if key is different", differentKey, kv);
 
         final KeyValue<String, Long> differentValue = KeyValue.pair(kv.key, kv.value + 1L);
-        assertFalse("must be false if value is different", kv.equals(differentValue));
-        assertFalse("must be false if value is different", differentValue.equals(kv));
+        assertNotEquals("must be false if value is different", kv, differentValue);
+        assertNotEquals("must be false if value is different", differentValue, kv);
 
         final KeyValue<String, Long> differentKeyAndValue = KeyValue.pair(kv.key + "suffix", kv.value + 1L);
-        assertFalse("must be false if key and value are different", kv.equals(differentKeyAndValue));
-        assertFalse("must be false if key and value are different", differentKeyAndValue.equals(kv));
+        assertNotEquals("must be false if key and value are different", kv, differentKeyAndValue);
+        assertNotEquals("must be false if key and value are different", differentKeyAndValue, kv);
     }
 
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 17df2b59a21..57d5bcad162 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -976,8 +976,6 @@ public class StoreUpgradeIntegrationTest {
             store.put(record.key(), newCount);
         }
 
-        @Override
-        public void close() {}
     }
 
     private static class TimestampedKeyValueProcessor implements Processor<Integer, Integer, Void, Void> {
@@ -1006,8 +1004,6 @@ public class StoreUpgradeIntegrationTest {
             store.put(record.key(), ValueAndTimestamp.make(newCount, newTimestamp));
         }
 
-        @Override
-        public void close() {}
     }
 
     private static class WindowedProcessor implements Processor<Integer, Integer, Void, Void> {
@@ -1032,8 +1028,6 @@ public class StoreUpgradeIntegrationTest {
             store.put(record.key(), newCount, record.key() < 10 ? 0L : 100000L);
         }
 
-        @Override
-        public void close() {}
     }
 
     private static class TimestampedWindowedProcessor implements Processor<Integer, Integer, Void, Void> {
@@ -1062,7 +1056,5 @@ public class StoreUpgradeIntegrationTest {
             store.put(record.key(), ValueAndTimestamp.make(newCount, newTimestamp), record.key() < 10 ? 0L : 100000L);
         }
 
-        @Override
-        public void close() {}
     }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java
index da5bca43b84..c989a3b1027 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java
@@ -121,10 +121,6 @@ public class KStreamNewProcessorApiTest {
                     context().forward(record.withValue(record.value() + "Updated"));
                 }
 
-                @Override
-                public void close() {
-
-                }
             };
         }
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
index 1de78a8b85b..0084c684a48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.junit.Test;
@@ -53,14 +52,10 @@ public class GraphGraceSearchUtilTest {
             "stateful",
             new ProcessorParameters<>(
                 () -> new Processor<String, Long, String, Long>() {
-                    @Override
-                    public void init(final ProcessorContext<String, Long> context) {}
 
                     @Override
                     public void process(final Record<String, Long> record) {}
 
-                    @Override
-                    public void close() {}
                 },
                 "dummy"
             ),
@@ -139,14 +134,10 @@ public class GraphGraceSearchUtilTest {
             "stateful",
             new ProcessorParameters<>(
                 () -> new Processor<String, Long, String, Long>() {
-                    @Override
-                    public void init(final ProcessorContext<String, Long> context) {}
 
                     @Override
                     public void process(final Record<String, Long> record) {}
 
-                    @Override
-                    public void close() {}
                 },
                 "dummy"
             ),
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
index 987784bbf90..3bd822cee5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/TableProcessorNodeTest.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.junit.Test;
 
@@ -26,17 +25,11 @@ import static org.junit.Assert.assertTrue;
 
 public class TableProcessorNodeTest {
     private static class TestProcessor implements Processor<String, String, String, String> {
-        @Override
-        public void init(final ProcessorContext<String, String> context) {
-        }
 
         @Override
         public void process(final Record<String, String> record) {
         }
 
-        @Override
-        public void close() {
-        }
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
index 3cec0cc199e..9be8b3c58bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingInMemoryKeyValueStoreTest.java
@@ -36,7 +36,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -90,11 +89,6 @@ public class CachingInMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest
         store.init((StateStoreContext) context, null);
     }
 
-    @After
-    public void after() {
-        super.after();
-    }
-
     @SuppressWarnings("unchecked")
     @Override
     protected <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index a9c63b7336c..a85ddcfce56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -68,6 +68,7 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -265,7 +266,7 @@ public class MeteredTimestampedKeyValueStoreTest {
 
         final RawAndDeserializedValue<String> valueWithBinary = metered.getWithBinary(KEY);
         assertEquals(valueWithBinary.value, VALUE_AND_TIMESTAMP);
-        assertEquals(valueWithBinary.serializedValue, VALUE_AND_TIMESTAMP_BYTES);
+        assertArrayEquals(valueWithBinary.serializedValue, VALUE_AND_TIMESTAMP_BYTES);
     }
 
     @SuppressWarnings("resource")
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index 805d29595bd..5fbe6dfbb1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -192,7 +192,7 @@ public class ThreadCacheTest {
         final Bytes key = Bytes.wrap(new byte[]{0});
 
         cache.put(namespace, key, dirtyEntry(key.get()));
-        assertEquals(key.get(), cache.delete(namespace, key).value());
+        assertArrayEquals(key.get(), cache.delete(namespace, key).value());
         assertNull(cache.get(namespace, key));
     }
 
@@ -203,7 +203,7 @@ public class ThreadCacheTest {
         final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
         cache.addDirtyEntryFlushListener(namespace, received::addAll);
         cache.put(namespace, key, dirtyEntry(key.get()));
-        assertEquals(key.get(), cache.delete(namespace, key).value());
+        assertArrayEquals(key.get(), cache.delete(namespace, key).value());
 
         // flushing should have no further effect
         cache.flush(namespace);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 30b0b585310..53b54c7226c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -244,9 +244,6 @@ public class TimeOrderedWindowStoreTest {
                     context.forward(record);
                 }
 
-                @Override
-                public void close() {
-                }
             }, "store-name");
 
         final Properties streamsConfiguration = new Properties();
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index c541650649e..a744bc12440 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -83,6 +83,7 @@ import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -545,8 +546,8 @@ public abstract class TopologyTestDriverTest {
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
         final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
 
-        assertEquals(key1, outputRecord.key());
-        assertEquals(value1, outputRecord.value());
+        assertArrayEquals(key1, outputRecord.key());
+        assertArrayEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_1, outputRecord.topic());
     }
 
@@ -760,13 +761,13 @@ public abstract class TopologyTestDriverTest {
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
         ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
-        assertEquals(key1, outputRecord.key());
-        assertEquals(value1, outputRecord.value());
+        assertArrayEquals(key1, outputRecord.key());
+        assertArrayEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_1, outputRecord.topic());
 
         outputRecord = testDriver.readRecord(SINK_TOPIC_2);
-        assertEquals(key1, outputRecord.key());
-        assertEquals(value1, outputRecord.value());
+        assertArrayEquals(key1, outputRecord.key());
+        assertArrayEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_2, outputRecord.topic());
     }
 
@@ -1488,8 +1489,8 @@ public abstract class TopologyTestDriverTest {
         pipeRecord(SOURCE_TOPIC_1, testRecord1);
 
         final ProducerRecord<byte[], byte[]> outputRecord = testDriver.readRecord(SINK_TOPIC_1);
-        assertEquals(key1, outputRecord.key());
-        assertEquals(value1, outputRecord.value());
+        assertArrayEquals(key1, outputRecord.key());
+        assertArrayEquals(value1, outputRecord.value());
         assertEquals(SINK_TOPIC_1, outputRecord.topic());
     }