You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/19 08:57:31 UTC
[flink] 01/02: [FLINK-18343][e2e] Refactor file-line replacement
into separate method
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit febe8e30bcf933647221248278068a4a6ecc3c5e
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 17 13:08:36 2020 +0200
[FLINK-18343][e2e] Refactor file-line replacement into separate method
---
.../util/kafka/LocalStandaloneKafkaResource.java | 28 ++++----
.../apache/flink/tests/util/util/FileUtils.java | 58 +++++++++++++++++
.../flink/tests/util/util/FileUtilsTest.java | 76 ++++++++++++++++++++++
3 files changed, 145 insertions(+), 17 deletions(-)
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index e1a3ff4..06af5be 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -24,6 +24,7 @@ import org.apache.flink.tests.util.CommandLineWrapper;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.util.FileUtils;
import org.apache.flink.util.OperatingSystem;
import org.junit.rules.TemporaryFolder;
@@ -33,15 +34,12 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.io.PrintStream;
-import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -115,22 +113,18 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
.build());
LOG.info("Updating ZooKeeper properties");
- final Path zookeeperPropertiesFile = kafkaDir.resolve(Paths.get("config", "zookeeper.properties"));
- final List<String> zookeeperPropertiesFileLines = Files.readAllLines(zookeeperPropertiesFile);
- try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(zookeeperPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
- zookeeperPropertiesFileLines.stream()
- .map(line -> ZK_DATA_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath()))
- .forEachOrdered(pw::println);
- }
+ FileUtils.replace(
+ kafkaDir.resolve(Paths.get("config", "zookeeper.properties")),
+ ZK_DATA_DIR_PATTERN,
+ matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath())
+ );
LOG.info("Updating Kafka properties");
- final Path kafkaPropertiesFile = kafkaDir.resolve(Paths.get("config", "server.properties"));
- final List<String> kafkaPropertiesFileLines = Files.readAllLines(kafkaPropertiesFile);
- try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(kafkaPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
- kafkaPropertiesFileLines.stream()
- .map(line -> KAFKA_LOG_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath()))
- .forEachOrdered(pw::println);
- }
+ FileUtils.replace(
+ kafkaDir.resolve(Paths.get("config", "server.properties")),
+ KAFKA_LOG_DIR_PATTERN,
+ matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath())
+ );
}
private void setupKafkaCluster() throws IOException {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
new file mode 100644
index 0000000..acfa882
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Collection of file-related utilities.
+ */
+public class FileUtils {
+
+ /**
+ * Matches the given {@link Pattern} against all lines in the given file, and replaces all matches with the replacement
+ * generated by the given {@link Function}.
+ * All unmatched lines and provided replacements are written into the file, with the order corresponding to the
+ * original content. Newlines are automatically added to each line; this implies that an empty replacement string
+ * will result in an empty line to be written.
+ */
+ public static void replace(Path file, Pattern pattern, Function<Matcher, String> replacer) throws IOException {
+ final List<String> fileLines = Files.readAllLines(file);
+ try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(file, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
+ for (String fileLine : fileLines) {
+ Matcher matcher = pattern.matcher(fileLine);
+ if (matcher.matches()) {
+ String replacement = replacer.apply(matcher);
+ pw.println(replacement);
+ } else {
+ pw.println(fileLine);
+ }
+ }
+ }
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
new file mode 100644
index 0000000..43eadab
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Tests for {@link FileUtils}.
+ */
+public class FileUtilsTest extends TestLogger {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+
+ private static final List<String> ORIGINAL_LINES = Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3"));
+ private Path testFile;
+
+ @Before
+ public void setupFile() throws IOException {
+ Path path = TMP.newFile().toPath();
+
+ Files.write(path, ORIGINAL_LINES);
+
+ testFile = path;
+ }
+
+ @Test
+ public void replaceSingleMatch() throws IOException {
+ FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed");
+
+ Assert.assertEquals(Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+ }
+
+ @Test
+ public void replaceMultipleMatch() throws IOException {
+ FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1));
+
+ Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile));
+ }
+
+ @Test
+ public void replaceWithEmptyLine() throws IOException {
+ FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> "");
+
+ Assert.assertEquals(Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+ }
+}