You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/19 08:57:31 UTC

[flink] 01/02: [FLINK-18343][e2e] Refactor file-line replacement into separate method

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit febe8e30bcf933647221248278068a4a6ecc3c5e
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 17 13:08:36 2020 +0200

    [FLINK-18343][e2e] Refactor file-line replacement into separate method
---
 .../util/kafka/LocalStandaloneKafkaResource.java   | 28 ++++----
 .../apache/flink/tests/util/util/FileUtils.java    | 58 +++++++++++++++++
 .../flink/tests/util/util/FileUtilsTest.java       | 76 ++++++++++++++++++++++
 3 files changed, 145 insertions(+), 17 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index e1a3ff4..06af5be 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -24,6 +24,7 @@ import org.apache.flink.tests.util.CommandLineWrapper;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
 import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.util.FileUtils;
 import org.apache.flink.util.OperatingSystem;
 
 import org.junit.rules.TemporaryFolder;
@@ -33,15 +34,12 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.io.PrintStream;
-import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -115,22 +113,18 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 			.build());
 
 		LOG.info("Updating ZooKeeper properties");
-		final Path zookeeperPropertiesFile = kafkaDir.resolve(Paths.get("config", "zookeeper.properties"));
-		final List<String> zookeeperPropertiesFileLines = Files.readAllLines(zookeeperPropertiesFile);
-		try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(zookeeperPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
-			zookeeperPropertiesFileLines.stream()
-				.map(line -> ZK_DATA_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath()))
-				.forEachOrdered(pw::println);
-		}
+		FileUtils.replace(
+			kafkaDir.resolve(Paths.get("config", "zookeeper.properties")),
+			ZK_DATA_DIR_PATTERN,
+			matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath())
+		);
 
 		LOG.info("Updating Kafka properties");
-		final Path kafkaPropertiesFile = kafkaDir.resolve(Paths.get("config", "server.properties"));
-		final List<String> kafkaPropertiesFileLines = Files.readAllLines(kafkaPropertiesFile);
-		try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(kafkaPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
-			kafkaPropertiesFileLines.stream()
-				.map(line -> KAFKA_LOG_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath()))
-				.forEachOrdered(pw::println);
-		}
+		FileUtils.replace(
+			kafkaDir.resolve(Paths.get("config", "server.properties")),
+			KAFKA_LOG_DIR_PATTERN,
+			matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath())
+		);
 	}
 
 	private void setupKafkaCluster() throws IOException {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
new file mode 100644
index 0000000..acfa882
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Collection of file-related utilities.
+ */
+public class FileUtils {
+
+	/**
+	 * Matches the given {@link Pattern} against all lines in the given file, and replaces all matches with the replacement
+	 * generated by the given {@link Function}.
+	 * All unmatched lines and provided replacements are written into the file, with the order corresponding to the
+	 * original content. Newlines are automatically added to each line; this implies that an empty replacement string
+	 * will result in an empty line to be written.
+	 */
+	public static void replace(Path file, Pattern pattern, Function<Matcher, String> replacer) throws IOException {
+		final List<String> fileLines = Files.readAllLines(file);
+		try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(file, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
+			for (String fileLine : fileLines) {
+				Matcher matcher = pattern.matcher(fileLine);
+				if (matcher.matches()) {
+					String replacement = replacer.apply(matcher);
+					pw.println(replacement);
+				} else {
+					pw.println(fileLine);
+				}
+			}
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
new file mode 100644
index 0000000..43eadab
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Tests for {@link FileUtils}.
+ */
+public class FileUtilsTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	private static final List<String> ORIGINAL_LINES = Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3"));
+	private Path testFile;
+
+	@Before
+	public void setupFile() throws IOException {
+		Path path = TMP.newFile().toPath();
+
+		Files.write(path, ORIGINAL_LINES);
+
+		testFile = path;
+	}
+
+	@Test
+	public void replaceSingleMatch() throws IOException {
+		FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed");
+
+		Assert.assertEquals(Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+	}
+
+	@Test
+	public void replaceMultipleMatch() throws IOException {
+		FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1));
+
+		Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile));
+	}
+
+	@Test
+	public void replaceWithEmptyLine() throws IOException {
+		FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> "");
+
+		Assert.assertEquals(Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+	}
+}