You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/25 11:53:14 UTC
[GitHub] [flink] snuyanzin commented on a diff in pull request #19808: [FLINK-27185][connectors] Convert connector-kafka module to assertj
snuyanzin commented on code in PR #19808:
URL: https://github.com/apache/flink/pull/19808#discussion_r881518844
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java:
##########
@@ -73,9 +72,9 @@ public void testPropertyHandling() {
.setKafkaProducerConfig(testConf)
.setProperty("k2", "correct"),
p -> {
- Arrays.stream(DEFAULT_KEYS).forEach(k -> assertTrue(k, p.containsKey(k)));
- assertEquals("v1", p.get("k1"));
- assertEquals("correct", p.get("k2"));
+ Arrays.stream(DEFAULT_KEYS).forEach(k -> assertThat(p).containsKey(k));
+ assertThat(p.get("k1")).isEqualTo("v1");
+ assertThat(p.get("k2")).isEqualTo("correct");
Review Comment:
Here `containsEntry` could be used + joined
```suggestion
assertThat(p).containsEntry("k1", "v1").containsEntry("k2", "correct");
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java:
##########
@@ -161,8 +156,8 @@ public void testSerializeRecordWithKey() {
.setKeySerializationSchema(serializationSchema)
.build();
final ProducerRecord<byte[], byte[]> record = schema.serialize("a", null, null);
- assertArrayEquals(record.key(), serializationSchema.serialize("a"));
- assertArrayEquals(record.value(), serializationSchema.serialize("a"));
+ assertThat(serializationSchema.serialize("a")).isEqualTo(record.key());
+ assertThat(serializationSchema.serialize("a")).isEqualTo(record.value());
Review Comment:
Could be joined as
```suggestion
assertThat(serializationSchema.serialize("a"))
.isEqualTo(record.key())
.isEqualTo(record.value());
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -157,26 +158,27 @@ public void testMultiplePartitionsPerConsumersFixedPartitions() {
List<KafkaTopicPartition> initialDiscovery =
partitionDiscoverer.discoverPartitions();
- assertTrue(initialDiscovery.size() >= minPartitionsPerConsumer);
- assertTrue(initialDiscovery.size() <= maxPartitionsPerConsumer);
+ assertThat(initialDiscovery.size())
+ .isGreaterThanOrEqualTo(minPartitionsPerConsumer);
+ assertThat(initialDiscovery.size()).isLessThanOrEqualTo(maxPartitionsPerConsumer);
Review Comment:
```suggestion
assertThat(initialDiscovery)
.hasSizeGreaterThanOrEqualTo(minPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxPartitionsPerConsumer);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery() throws Exception {
partitionDiscoverer.open();
List<KafkaTopicPartition> discoveredPartitions1 = partitionDiscoverer.discoverPartitions();
- assertEquals(2, discoveredPartitions1.size());
- assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 1)));
- assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 4)));
+ assertThat(discoveredPartitions1).hasSize(2);
+ assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 1));
+ assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 4));
List<KafkaTopicPartition> discoveredPartitions2 = partitionDiscoverer.discoverPartitions();
- assertEquals(3, discoveredPartitions2.size());
- assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 0)));
- assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 2)));
- assertTrue(discoveredPartitions2.contains(new KafkaTopicPartition("test-topic", 3)));
+ assertThat(discoveredPartitions2).hasSize(3);
+ assertThat(discoveredPartitions2).contains(new KafkaTopicPartition("test-topic", 0));
+ assertThat(discoveredPartitions2).contains(new KafkaTopicPartition("test-topic", 2));
+ assertThat(discoveredPartitions2).contains(new KafkaTopicPartition("test-topic", 3));
Review Comment:
```suggestion
assertThat(discoveredPartitions2)
.hasSize(3)
.contains(
new KafkaTopicPartition("test-topic", 0),
new KafkaTopicPartition("test-topic", 2),
new KafkaTopicPartition("test-topic", 3));
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriterTest.java:
##########
@@ -271,9 +270,9 @@ private void compareCompactedResult(
actualMap.computeIfAbsent(id, key -> new ArrayList<>()).add(rowData);
}
- assertEquals(expected.size(), actualMap.size());
+ assertThat(actualMap).hasSize(expected.size());
Review Comment:
```suggestion
assertThat(actualMap).hasSameSizeAs(expected);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {
}
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- assertEquals(
- expectedSubscribedPartitionsWithStartOffsets,
- consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+ assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
// the new partitions should have been considered as restored state
- assertTrue(consumerFunction.getRestoredState() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ assertThat(consumerFunction.getRestoredState()).isNotNull();
+ assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
for (Map.Entry<KafkaTopicPartition, Long> expectedEntry :
expectedSubscribedPartitionsWithStartOffsets.entrySet()) {
- assertEquals(
- expectedEntry.getValue(),
- consumerFunction.getRestoredState().get(expectedEntry.getKey()));
+ assertThat(consumerFunction.getRestoredState().get(expectedEntry.getKey()))
+ .isEqualTo(expectedEntry.getValue());
Review Comment:
```suggestion
assertThat(consumerFunction.getRestoredState()).containsEntry(
expectedEntry.getKey(),
expectedEntry.getValue());
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
new DummyFlinkKafkaProducer<>(
props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
- assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
- assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
+ assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
Review Comment:
```suggestion
assertThat(props)
.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -220,25 +222,25 @@ public void testPartitionsFewerThanConsumersFixedPartitions() {
List<KafkaTopicPartition> initialDiscovery =
partitionDiscoverer.discoverPartitions();
- assertTrue(initialDiscovery.size() <= 1);
+ assertThat(initialDiscovery.size()).isLessThanOrEqualTo(1);
Review Comment:
```suggestion
assertThat(initialDiscovery).hasSizeLessThanOrEqualTo(1);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
AdminClient adminClient =
(AdminClient) Whitebox.getInternalState(enumerator, "adminClient");
- assertNotNull(adminClient);
+ assertThat(adminClient).isNotNull();
String clientId = (String) Whitebox.getInternalState(adminClient, "clientId");
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
- assertEquals(
- defaultTimeoutMs,
- Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"));
-
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
+ assertThat(clientId).isNotNull();
+ assertThat(clientId).startsWith(clientIdPrefix);
+ assertThat(Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"))
+ .isEqualTo(defaultTimeoutMs);
+
+ assertThat(clientId).isNotNull();
+ assertThat(clientId).startsWith(clientIdPrefix);
Review Comment:
```suggestion
assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.earliest();
List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
Review Comment:
```suggestion
assertThat(offsets).hasSameSizeAs(partitions);
assertThat(offsets.keySet()).containsAll(partitions);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.earliest();
List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
for (long offset : offsets.values()) {
- Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+ assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
}
- assertEquals(OffsetResetStrategy.EARLIEST, initializer.getAutoOffsetResetStrategy());
+ assertThat(initializer.getAutoOffsetResetStrategy())
+ .isEqualTo(OffsetResetStrategy.EARLIEST);
}
@Test
public void testLatestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.latest();
List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
Review Comment:
```suggestion
assertThat(offsets).hasSameSizeAs(partitions);
assertThat(offsets.keySet()).containsAll(partitions);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -314,33 +316,39 @@ public void testGrowingPartitions() {
List<KafkaTopicPartition> initialDiscoverySubtask2 =
partitionDiscovererSubtask2.discoverPartitions();
- assertTrue(initialDiscoverySubtask0.size() >= minInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask0.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask1.size() >= minInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask1.size() <= maxInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask2.size() >= minInitialPartitionsPerConsumer);
- assertTrue(initialDiscoverySubtask2.size() <= maxInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask0.size())
+ .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask0.size())
+ .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask1.size())
+ .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask1.size())
+ .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask2.size())
+ .isGreaterThanOrEqualTo(minInitialPartitionsPerConsumer);
+ assertThat(initialDiscoverySubtask2.size())
+ .isLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
Review Comment:
```suggestion
assertThat(initialDiscoverySubtask0)
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
assertThat(initialDiscoverySubtask1)
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
assertThat(initialDiscoverySubtask2)
.hasSizeGreaterThanOrEqualTo(minInitialPartitionsPerConsumer)
.hasSizeLessThanOrEqualTo(maxInitialPartitionsPerConsumer);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java:
##########
@@ -304,16 +298,15 @@ public void testKafkaClientProperties() throws Exception {
AdminClient adminClient =
(AdminClient) Whitebox.getInternalState(enumerator, "adminClient");
- assertNotNull(adminClient);
+ assertThat(adminClient).isNotNull();
String clientId = (String) Whitebox.getInternalState(adminClient, "clientId");
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
- assertEquals(
- defaultTimeoutMs,
- Whitebox.getInternalState(adminClient, "defaultApiTimeoutMs"));
-
- assertNotNull(clientId);
- assertTrue(clientId.startsWith(clientIdPrefix));
+ assertThat(clientId).isNotNull();
+ assertThat(clientId).startsWith(clientIdPrefix);
Review Comment:
```suggestion
assertThat(clientId).isNotNull().startsWith(clientIdPrefix);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/initializer/OffsetsInitializerTest.java:
##########
@@ -65,36 +63,38 @@ public void testEarliestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.earliest();
List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
for (long offset : offsets.values()) {
- Assert.assertEquals(KafkaPartitionSplit.EARLIEST_OFFSET, offset);
+ assertThat(offset).isEqualTo(KafkaPartitionSplit.EARLIEST_OFFSET);
}
- assertEquals(OffsetResetStrategy.EARLIEST, initializer.getAutoOffsetResetStrategy());
+ assertThat(initializer.getAutoOffsetResetStrategy())
+ .isEqualTo(OffsetResetStrategy.EARLIEST);
}
@Test
public void testLatestOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.latest();
List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
- assertTrue(offsets.keySet().containsAll(partitions));
+ assertThat(offsets).hasSize(partitions.size());
+ assertThat(offsets.keySet().containsAll(partitions)).isTrue();
for (long offset : offsets.values()) {
- assertEquals(KafkaPartitionSplit.LATEST_OFFSET, offset);
+ assertThat(offset).isEqualTo(KafkaPartitionSplit.LATEST_OFFSET);
}
- assertEquals(OffsetResetStrategy.LATEST, initializer.getAutoOffsetResetStrategy());
+ assertThat(initializer.getAutoOffsetResetStrategy()).isEqualTo(OffsetResetStrategy.LATEST);
}
@Test
public void testCommittedGroupOffsetsInitializer() {
OffsetsInitializer initializer = OffsetsInitializer.committedOffsets();
List<TopicPartition> partitions = KafkaSourceTestEnv.getPartitionsForTopic(TOPIC);
Map<TopicPartition, Long> offsets = initializer.getPartitionOffsets(partitions, retriever);
- assertEquals(partitions.size(), offsets.size());
+ assertThat(offsets).hasSize(partitions.size());
Review Comment:
```suggestion
assertThat(offsets).hasSameSizeAs(partitions);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -222,11 +221,11 @@ public void testRestoreFromEmptyStateNoPartitions() throws Exception {
testHarness.open();
// assert that no partitions were found and is empty
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();
Review Comment:
Under the hood `isEmpty` checks for `isNotNull` => could be omitted
```suggestion
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isEmpty();
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java:
##########
@@ -99,12 +99,12 @@ public static void comparedWithKeyAndOrder(
actualData.computeIfAbsent(key, k -> new LinkedList<>()).add(row);
}
// compare key first
- assertEquals("Actual result: " + actual, expectedData.size(), actualData.size());
+ assertThat(actualData).as("Actual result: " + actual).hasSize(expectedData.size());
Review Comment:
```suggestion
assertThat(actualData).as("Actual result: " + actual).hasSameSizeAs(expectedData);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {
}
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- assertEquals(
- expectedSubscribedPartitionsWithStartOffsets,
- consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+ assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
// the new partitions should have been considered as restored state
- assertTrue(consumerFunction.getRestoredState() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ assertThat(consumerFunction.getRestoredState()).isNotNull();
+ assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
Review Comment:
```suggestion
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotEmpty();
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -275,20 +274,18 @@ public void testRestoreFromEmptyStateWithPartitions() throws Exception {
}
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
- assertEquals(
- expectedSubscribedPartitionsWithStartOffsets,
- consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+ assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
Review Comment:
Under the hood `isNotEmpty()`, `isEmpty()` check for is not null, also could be joined
```suggestion
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
.isNotEmpty()
.isEqualTo(expectedSubscribedPartitionsWithStartOffsets);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractPartitionDiscovererTest.java:
##########
@@ -507,15 +512,15 @@ public void testNonContiguousPartitionIdDiscovery() throws Exception {
partitionDiscoverer.open();
List<KafkaTopicPartition> discoveredPartitions1 = partitionDiscoverer.discoverPartitions();
- assertEquals(2, discoveredPartitions1.size());
- assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 1)));
- assertTrue(discoveredPartitions1.contains(new KafkaTopicPartition("test-topic", 4)));
+ assertThat(discoveredPartitions1).hasSize(2);
+ assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 1));
+ assertThat(discoveredPartitions1).contains(new KafkaTopicPartition("test-topic", 4));
Review Comment:
```suggestion
assertThat(discoveredPartitions1)
.hasSize(2)
.contains(
new KafkaTopicPartition("test-topic", 1),
new KafkaTopicPartition("test-topic", 4));
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
testHarness.open();
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+ assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
// on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
- assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(PARTITION_STATE);
// assert that state is correctly restored from legacy checkpoint
- assertTrue(consumerFunction.getRestoredState() != null);
- assertEquals(PARTITION_STATE, consumerFunction.getRestoredState());
+ assertThat(consumerFunction.getRestoredState()).isNotNull();
+ assertThat(consumerFunction.getRestoredState()).isEqualTo(PARTITION_STATE);
Review Comment:
```suggestion
assertThat(consumerFunction.getRestoredState()).isNotNull().isEqualTo(PARTITION_STATE);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java:
##########
@@ -325,15 +322,16 @@ public void testRestore() throws Exception {
testHarness.open();
// assert that there are partitions and is identical to expected list
- assertTrue(consumerFunction.getSubscribedPartitionsToStartOffsets() != null);
- assertTrue(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets()).isNotNull();
+ assertThat(!consumerFunction.getSubscribedPartitionsToStartOffsets().isEmpty()).isTrue();
// on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
- assertEquals(PARTITION_STATE, consumerFunction.getSubscribedPartitionsToStartOffsets());
+ assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
+ .isEqualTo(PARTITION_STATE);
Review Comment:
All these lines could be joined in something like these
```java
assertThat(consumerFunction.getSubscribedPartitionsToStartOffsets())
.isNotEmpty()
// on restore, subscribedPartitionsToStartOffsets should be identical to the restored state
.isEqualTo(PARTITION_STATE);
```
##########
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java:
##########
@@ -82,14 +82,16 @@ public void testKeyValueDeserializersSetIfMissing() throws Exception {
new DummyFlinkKafkaProducer<>(
props, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), null);
- assertTrue(props.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
- assertTrue(props.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
- assertTrue(
- props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
- .equals(ByteArraySerializer.class.getName()));
+ assertThat(props).containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ assertThat(props).containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ assertThat(
+ props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+ .equals(ByteArraySerializer.class.getName()))
+ .isTrue();
+ assertThat(
+ props.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+ .equals(ByteArraySerializer.class.getName()))
+ .isTrue();
Review Comment:
Is it expected to have these lines twice?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org