You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/07 07:44:31 UTC

[flink] 04/07: [hotfix][kafka, test] Make brokers list final and avoid potential null pointer exceptions

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b1976a5c8df28f737b04a7cb6cf96312d363c7d
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Feb 8 13:35:13 2019 +0100

    [hotfix][kafka,test] Make brokers list final and avoid potential null pointer exceptions
---
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java   | 5 ++---
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java   | 5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 539bc59..160adf5 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -73,10 +73,10 @@ import static org.junit.Assert.fail;
 public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private final List<KafkaServer> brokers = new ArrayList<>();
 	private File tmpZkDir;
 	private File tmpKafkaParent;
 	private List<File> tmpKafkaDirs;
-	private List<KafkaServer> brokers;
 	private TestingServer zookeeper;
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
@@ -258,14 +258,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		zookeeper = null;
-		brokers = null;
+		brokers.clear();
 
 		zookeeper = new TestingServer(-1, tmpZkDir);
 		zookeeperConnectionString = zookeeper.getConnectString();
 		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
 		LOG.info("Starting KafkaServer");
-		brokers = new ArrayList<>(config.getKafkaServersNumber());
 
 		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
 		for (int i = 0; i < config.getKafkaServersNumber(); i++) {
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ad68d59..d6a7705 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -74,10 +74,10 @@ import static org.junit.Assert.fail;
 public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 
 	protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
+	private final List<KafkaServer> brokers = new ArrayList<>();
 	private File tmpZkDir;
 	private File tmpKafkaParent;
 	private List<File> tmpKafkaDirs;
-	private List<KafkaServer> brokers;
 	private TestingServer zookeeper;
 	private String zookeeperConnectionString;
 	private String brokerConnectionString = "";
@@ -117,14 +117,13 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		}
 
 		zookeeper = null;
-		brokers = null;
+		brokers.clear();
 
 		zookeeper = new TestingServer(-1, tmpZkDir);
 		zookeeperConnectionString = zookeeper.getConnectString();
 		LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
 
 		LOG.info("Starting KafkaServer");
-		brokers = new ArrayList<>(config.getKafkaServersNumber());
 
 		ListenerName listenerName = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
 		for (int i = 0; i < config.getKafkaServersNumber(); i++) {