You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/07 07:44:31 UTC
[flink] 04/07: [hotfix][kafka,
test] Make brokers list final and avoid potential null pointer
exceptions
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3b1976a5c8df28f737b04a7cb6cf96312d363c7d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 8 13:35:13 2019 +0100
[hotfix][kafka,test] Make brokers list final and avoid potential null pointer exceptions
---
.../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 5 ++---
.../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 5 ++---
2 files changed, 4 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 539bc59..160adf5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -73,10 +73,10 @@ import static org.junit.Assert.fail;
public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+ private final List<KafkaServer> brokers = new ArrayList<>();
private File tmpZkDir;
private File tmpKafkaParent;
private List<File> tmpKafkaDirs;
- private List<KafkaServer> brokers;
private TestingServer zookeeper;
private String zookeeperConnectionString;
private String brokerConnectionString = "";
@@ -258,14 +258,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
zookeeper = null;
- brokers = null;
+ brokers.clear();
zookeeper = new TestingServer(-1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(config.getKafkaServersNumber());
ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
for (int i = 0; i < config.getKafkaServersNumber(); i++) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ad68d59..d6a7705 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -74,10 +74,10 @@ import static org.junit.Assert.fail;
public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+ private final List<KafkaServer> brokers = new ArrayList<>();
private File tmpZkDir;
private File tmpKafkaParent;
private List<File> tmpKafkaDirs;
- private List<KafkaServer> brokers;
private TestingServer zookeeper;
private String zookeeperConnectionString;
private String brokerConnectionString = "";
@@ -117,14 +117,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
zookeeper = null;
- brokers = null;
+ brokers.clear();
zookeeper = new TestingServer(-1, tmpZkDir);
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
LOG.info("Starting KafkaServer");
- brokers = new ArrayList<>(config.getKafkaServersNumber());
ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
for (int i = 0; i < config.getKafkaServersNumber(); i++) {