You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/25 11:53:14 UTC

[GitHub] [flink] snuyanzin commented on a diff in pull request #19808: [FLINK-27185][connectors] Convert connector-kafka module to assertj

snuyanzin commented on code in PR #19808:
URL: https://github.com/apache/flink/pull/19808#discussion_r881518844


##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -73,9 +72,9 @@ public void testPropertyHandling() {
                         .setKafkaProducerConfig(testConf)
                         .setProperty("k2", "correct"),
                 p -> {
-                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
-                    assertEquals("v1", p.get("k1"));
-                    assertEquals("correct", p.get("k2"));
+                    Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
+                    assertThat(p.get("k1")).isEqualTo("v1");
+                    assertThat(p.get("k2")).isEqualTo("correct");

Review Comment:
   Here `containsEntry` could be used + joined
   ```suggestion
                       assertThat(p).containsEntry("k1", "v1").containsEntry("k2", "correct");
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -161,8 +156,8 @@ public void testSerializeRecordWithKey() {
                         .setKeySerializationSchema(serializationSchema)
                         .build();
         final ProducerRecord<byte[], byte[]> record = schema.serialize("a", null, null);
-        assertArrayEquals(record.key(), serializationSchema.serialize("a"));
-        assertArrayEquals(record.value(), serializationSchema.serialize("a"));
+        assertThat(serializationSchema.serialize("a")).isEqualTo(record.key());
+        assertThat(serializationSchema.serialize("a")).isEqualTo(record.value());

Review Comment:
   Could be joined as
   ```suggestion
           assertThat(serializationSchema.serialize("a"))
                   .isEqualTo(record.key())
                   .isEqualTo(record.value());
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -157,26 +158,27 @@ public void testMultiplePartitionsPerConsumersFixedPartitions() {
 
                 List<KafkaTopicPartition> initialDiscovery =
                         partitionDiscoverer.discoverPartitions();
-                assertTrue(initialDiscovery.size() >= minPartitionsPerConsumer);
-                assertTrue(initialDiscovery.size() <= maxPartitionsPerConsumer);
+                assertThat(initialDiscovery.size())
+                        .isGreaterThanOrEqualTo(minPartitionsPerConsumer);
+                assertThat(initialDiscovery.size()).isLessThanOrEqualTo(maxPartitionsPerConsumer);

Review Comment:
   ```suggestion
                   assertThat(initialDiscovery)
                           .hasSizeGreaterThanOrEqualTo(minPartitionsPerConsumer)
                           .hasSizeLessThanOrEqualTo(maxPartitionsPerConsumer);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery() throws Exception {
         partitionDiscoverer.open();
 
         List<KafkaTopicPartition> discoveredPartitions1 = partitionDiscoverer.discoverPartitions();
-        assertEquals(2, discoveredPartitions1.size());
-        assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 1)));
-        assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 4)));
+        assertThat(discoveredPartitions1).hasSize(2);
+        assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 1));
+        assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 4));
 
         List<KafkaTopicPartition> discoveredPartitions2 = partitionDiscoverer.discoverPartitions();
-        assertEquals(3, discoveredPartitions2.size());
-        assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 0)));
-        assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 2)));
-        assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 3)));
+        assertThat(discoveredPartitions2).hasSize(3);
+        assertThat(discoveredPartitions2).contains(new KafkaTopicPartition("test-topic", 0));
+        assertThat(discoveredPartitions2).contains(new KafkaTopicPartition("test-topic", 2));
+        assertThat(discoveredPartitions2).contains(new KafkaTopicPartition("test-topic", 3));

Review Comment:
   ```suggestion
           assertThat(discoveredPartitions2)
                   .hasSize(3)
                   .contains(
                           new KafkaTopicPartition("test-topic", 0),
                           new KafkaTopicPartition("test-topic", 2),
                           new KafkaTopicPartition("test-topic", 3));
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -271,9 +270,9 @@ private void compareCompactedResult(
             actualMap.computeIfAbsent(id, key -> new ArrayList<>()).add(rowData);
         }
 
-        assertEquals(expected.size(), actualMap.size());
+        assertThat(actualMap).hasSize(expected.size());

Review Comment:
   ```suggestion
           assertThat(actualMap).hasSameSizeAs(expected);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {
         }
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
-        assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-        assertEquals(
-                expectedSubscribedPartitionsWithStartOffsets,
-                consumerFunction.getSubscribedPartitionsToStartOffsets());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
 
         // the new partitions should have been considered as restored state
-        assertTrue(consumerFunction.getRestoredState() != null);
-        assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        assertThat(consumerFunction.getRestoredState()).isNotNull();
+        assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
         for (Map.Entry<KafkaTopicPartition, Long> expectedEntry :
                 expectedSubscribedPartitionsWithStartOffsets.entrySet()) {
-            assertEquals(
-                    expectedEntry.getValue(),
-                    consumerFunction.getRestoredState().get(expectedEntry.getKey()));
+            assertThat(consumerFunction.getRestoredState().get(expectedEntry.getKey()))
+                    .isEqualTo(expectedEntry.getValue());

Review Comment:
   ```suggestion
               assertThat(consumerFunction.getRestoredState()).containsEntry(
                       expectedEntry.getKey(),
                       expectedEntry.getValue());
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
         new DummyFlinkKafkaProducer<>(
                 props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
-        assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-        assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
+        assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);

Review Comment:
   ```suggestion
           assertThat(props)
                   .containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
                   .containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -220,25 +222,25 @@ public void testPartitionsFewerThanConsumersFixedPartitions() {
 
                 List<KafkaTopicPartition> initialDiscovery =
                         partitionDiscoverer.discoverPartitions();
-                assertTrue(initialDiscovery.size() <= 1);
+                assertThat(initialDiscovery.size()).isLessThanOrEqualTo(1);

Review Comment:
   ```suggestion
                   assertThat(initialDiscovery).hasSizeLessThanOrEqualTo(1);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
 
             AdminClient adminClient =
                     (AdminClient) Whitebox.getInternalState(enumerator, "adminClient");
-            assertNotNull(adminClient);
+            assertThat(adminClient).isNotNull();
             String clientId = (String) Whitebox.getInternalState(adminClient, "clientId");
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
-            assertEquals(
-                    defaultTimeoutMs,
-                    Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"));
-
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
+            assertThat(clientId).isNotNull();
+            assertThat(clientId).startsWith(clientIdPrefix);
+            assertThat(Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"))
+                    .isEqualTo(defaultTimeoutMs);
+
+            assertThat(clientId).isNotNull();
+            assertThat(clientId).startsWith(clientIdPrefix);

Review Comment:
   ```suggestion
               assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.earliest();
         List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();

Review Comment:
   ```suggestion
           assertThat(offsets).hasSameSizeAs(partitions);
           assertThat(offsets.keySet()).containsAll(partitions);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.earliest();
         List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();
         for (long offset : offsets.values()) {
-            Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+            assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
         }
-        assertEquals(OffsetResetStrategy.EARLIEST, initializer.getAutoOffsetResetStrategy());
+        assertThat(initializer.getAutoOffsetResetStrategy())
+                .isEqualTo(OffsetResetStrategy.EARLIEST);
     }
 
     @Test
     public void testLatestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.latest();
         List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();

Review Comment:
   ```suggestion
           assertThat(offsets).hasSameSizeAs(partitions);
           assertThat(offsets.keySet()).containsAll(partitions);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -314,33 +316,39 @@ public void testGrowingPartitions() {
             List<KafkaTopicPartition> initialDiscoverySubtask2 =
                     partitionDiscovererSubtask2.discoverPartitions();
 
-            assertTrue(initialDiscoverySubtask0.size() >= minInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask0.size() <= maxInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask1.size() >= minInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask1.size() <= maxInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask2.size() >= minInitialPartitionsPerConsumer);
-            assertTrue(initialDiscoverySubtask2.size() <= maxInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask0.size())
+                    .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask0.size())
+                    .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask1.size())
+                    .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask1.size())
+                    .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask2.size())
+                    .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+            assertThat(initialDiscoverySubtask2.size())
+                    .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);

Review Comment:
   ```suggestion
               assertThat(initialDiscoverySubtask0)
                       .hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
                       .hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
               assertThat(initialDiscoverySubtask1)
                       .hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
                       .hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
               assertThat(initialDiscoverySubtask2)
                       .hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
                       .hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
 
             AdminClient adminClient =
                     (AdminClient) Whitebox.getInternalState(enumerator, "adminClient");
-            assertNotNull(adminClient);
+            assertThat(adminClient).isNotNull();
             String clientId = (String) Whitebox.getInternalState(adminClient, "clientId");
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
-            assertEquals(
-                    defaultTimeoutMs,
-                    Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"));
-
-            assertNotNull(clientId);
-            assertTrue(clientId.startsWith(clientIdPrefix));
+            assertThat(clientId).isNotNull();
+            assertThat(clientId).startsWith(clientIdPrefix);

Review Comment:
   ```suggestion
               assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.earliest();
         List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();
         for (long offset : offsets.values()) {
-            Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+            assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
         }
-        assertEquals(OffsetResetStrategy.EARLIEST, initializer.getAutoOffsetResetStrategy());
+        assertThat(initializer.getAutoOffsetResetStrategy())
+                .isEqualTo(OffsetResetStrategy.EARLIEST);
     }
 
     @Test
     public void testLatestOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.latest();
         List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
-        assertTrue(offsets.keySet().containsAll(partitions));
+        assertThat(offsets).hasSize(partitions.size());
+        assertThat(offsets.keySet().containsAll(partitions)).isTrue();
         for (long offset : offsets.values()) {
-            assertEquals(KafkaPartitionSplit.LATEST_OFFSET, offset);
+            assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET);
         }
-        assertEquals(OffsetResetStrategy.LATEST, initializer.getAutoOffsetResetStrategy());
+        assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
     }
 
     @Test
     public void testCommittedGroupOffsetsInitializer() {
         OffsetsInitializer initializer = OffsetsInitializer.committedOffsets();
         List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
         Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
-        assertEquals(partitions.size(), offsets.size());
+        assertThat(offsets).hasSize(partitions.size());

Review Comment:
   ```suggestion
           assertThat(offsets).hasSameSizeAs(partitions);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -222,11 +221,11 @@ public void testRestoreFromEmptyStateNoPartitions() throws Exception {
         testHarness.open();
 
         // assert that no partitions were found and is empty
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();

Review Comment:
   Under the hood `isEmpty` checks for `isNotNull` => could be omitted
   ```suggestion
           assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java:
##########
@@ -99,12 +99,12 @@ public static void comparedWithKeyAndOrder(
             actualData.computeIfAbsent(key, k -> new LinkedList<>()).add(row);
         }
         // compare key first
-        assertEquals("Actual result: " + actual, expectedData.size(), actualData.size());
+        assertThat(actualData).as("Actual result: " + actual).hasSize(expectedData.size());

Review Comment:
   ```suggestion
           assertThat(actualData).as("Actual result: " + actual).hasSameSizeAs(expectedData);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {
         }
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
-        assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-        assertEquals(
-                expectedSubscribedPartitionsWithStartOffsets,
-                consumerFunction.getSubscribedPartitionsToStartOffsets());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
 
         // the new partitions should have been considered as restored state
-        assertTrue(consumerFunction.getRestoredState() != null);
-        assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        assertThat(consumerFunction.getRestoredState()).isNotNull();
+        assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();

Review Comment:
   ```suggestion
           assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotEmpty();
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {
         }
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
-        assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
-        assertEquals(
-                expectedSubscribedPartitionsWithStartOffsets,
-                consumerFunction.getSubscribedPartitionsToStartOffsets());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);

Review Comment:
   Under the hood `isNotEmpty()`, `isEmpty()` check for is not null, also could be joined
   ```suggestion
           assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
                   .isNotEmpty()
                   .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery() throws Exception {
         partitionDiscoverer.open();
 
         List<KafkaTopicPartition> discoveredPartitions1 = partitionDiscoverer.discoverPartitions();
-        assertEquals(2, discoveredPartitions1.size());
-        assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 1)));
-        assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 4)));
+        assertThat(discoveredPartitions1).hasSize(2);
+        assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 1));
+        assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 4));

Review Comment:
   ```suggestion
           assertThat(discoveredPartitions1)
                   .hasSize(2)
                   .contains(
                           new KafkaTopicPartition("test-topic", 1),
                           new KafkaTopicPartition("test-topic", 4));
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
         testHarness.open();
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
-        assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
 
         // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
-        assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(PARTITION_STATE);
 
         // assert that state is correctly restored from legacy checkpoint
-        assertTrue(consumerFunction.getRestoredState() != null);
-        assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
+        assertThat(consumerFunction.getRestoredState()).isNotNull();
+        assertThat(consumerFunction.getRestoredState()).isEqualTo(PARTITION_STATE);

Review Comment:
   ```suggestion
           assertThat(consumerFunction.getRestoredState()).isNotNull().isEqualTo(PARTITION_STATE);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
         testHarness.open();
 
         // assert that there are partitions and is identical to expected list
-        assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
-        assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+        assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
 
         // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
-        assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
+        assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+                .isEqualTo(PARTITION_STATE);

Review Comment:
   All these lines could be joined in something like these
   ```java
           assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
                   .isNotEmpty()
                   // on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
                   .isEqualTo(PARTITION_STATE);
   ```



##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
         new DummyFlinkKafkaProducer<>(
                 props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
 
-        assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-        assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
-        assertTrue(
-                props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                        .equals(ByteArraySerializer.class.getName()));
+        assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+        assertThat(
+                        props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                                .equals(ByteArraySerializer.class.getName()))
+                .isTrue();
+        assertThat(
+                        props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+                                .equals(ByteArraySerializer.class.getName()))
+                .isTrue();

Review Comment:
   Is it expected to have these lines twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org