You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/19 08:57:30 UTC

[flink] branch master updated (c8e9d0d -> f8b0bff)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c8e9d0d  [FLINK-18368][tests] Cleanup Kerberos settings after test.
     new febe8e3  [FLINK-18343][e2e] Refactor file-line replacement into separate method
     new f8b0bff  [FLINK-18343][e2e] Set Flink rootLogger to DEBUG

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../util/kafka/LocalStandaloneKafkaResource.java   | 28 ++++----
 .../flink/tests/util/flink/FlinkDistribution.java  | 11 ++++
 .../util/flink/LocalStandaloneFlinkResource.java   |  2 +
 .../apache/flink/tests/util/util/FileUtils.java    | 58 +++++++++++++++++
 .../flink/tests/util/util/FileUtilsTest.java       | 76 ++++++++++++++++++++++
 5 files changed, 158 insertions(+), 17 deletions(-)
 create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
 create mode 100644 flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java


[flink] 02/02: [FLINK-18343][e2e] Set Flink rootLogger to DEBUG

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8b0bffb28aebb2db3415907c78e85576b84c206
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 17 13:08:58 2020 +0200

    [FLINK-18343][e2e] Set Flink rootLogger to DEBUG
---
 .../org/apache/flink/tests/util/flink/FlinkDistribution.java  | 11 +++++++++++
 .../flink/tests/util/flink/LocalStandaloneFlinkResource.java  |  2 ++
 2 files changed, 13 insertions(+)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
index 9516880..07c910e 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkDistribution.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.tests.util.AutoClosableProcess;
 import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.util.FileUtils;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -34,6 +35,7 @@ import okhttp3.Request;
 import okhttp3.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import java.io.BufferedReader;
 import java.io.FileInputStream;
@@ -66,6 +68,8 @@ final class FlinkDistribution {
 
 	private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
+	private static final Pattern ROOT_LOGGER_PATTERN = Pattern.compile("(rootLogger.level =).*");
+
 	private final Path opt;
 	private final Path lib;
 	private final Path conf;
@@ -96,6 +100,13 @@ final class FlinkDistribution {
 		AutoClosableProcess.runBlocking(bin.resolve("taskmanager.sh").toAbsolutePath().toString(), "start");
 	}
 
+	public void setRootLogLevel(Level logLevel) throws IOException {
+		FileUtils.replace(
+			conf.resolve("log4j.properties"),
+			ROOT_LOGGER_PATTERN,
+			matcher -> matcher.group(1) + " " + logLevel.name());
+	}
+
 	public void startFlinkCluster() throws IOException {
 		LOG.info("Starting Flink cluster.");
 		AutoClosableProcess.runBlocking(bin.resolve("start-cluster.sh").toAbsolutePath().toString());
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index 6a95d55..55ef08c 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -34,6 +34,7 @@ import org.apache.flink.util.ConfigurationException;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import javax.annotation.Nullable;
 
@@ -80,6 +81,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
 		TestUtils.copyDirectory(distributionDirectory, tmp);
 
 		distribution = new FlinkDistribution(tmp);
+		distribution.setRootLogLevel(Level.DEBUG);
 		for (JarOperation jarOperation : setup.getJarOperations()) {
 			distribution.performJarOperation(jarOperation);
 		}


[flink] 01/02: [FLINK-18343][e2e] Refactor file-line replacement into separate method

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit febe8e30bcf933647221248278068a4a6ecc3c5e
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Wed Jun 17 13:08:36 2020 +0200

    [FLINK-18343][e2e] Refactor file-line replacement into separate method
---
 .../util/kafka/LocalStandaloneKafkaResource.java   | 28 ++++----
 .../apache/flink/tests/util/util/FileUtils.java    | 58 +++++++++++++++++
 .../flink/tests/util/util/FileUtilsTest.java       | 76 ++++++++++++++++++++++
 3 files changed, 145 insertions(+), 17 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index e1a3ff4..06af5be 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -24,6 +24,7 @@ import org.apache.flink.tests.util.CommandLineWrapper;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
 import org.apache.flink.tests.util.cache.DownloadCache;
+import org.apache.flink.tests.util.util.FileUtils;
 import org.apache.flink.util.OperatingSystem;
 
 import org.junit.rules.TemporaryFolder;
@@ -33,15 +34,12 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.io.PrintStream;
-import java.io.PrintWriter;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -115,22 +113,18 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 			.build());
 
 		LOG.info("Updating ZooKeeper properties");
-		final Path zookeeperPropertiesFile = kafkaDir.resolve(Paths.get("config", "zookeeper.properties"));
-		final List<String> zookeeperPropertiesFileLines = Files.readAllLines(zookeeperPropertiesFile);
-		try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(zookeeperPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
-			zookeeperPropertiesFileLines.stream()
-				.map(line -> ZK_DATA_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath()))
-				.forEachOrdered(pw::println);
-		}
+		FileUtils.replace(
+			kafkaDir.resolve(Paths.get("config", "zookeeper.properties")),
+			ZK_DATA_DIR_PATTERN,
+			matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("zookeeper").toAbsolutePath())
+		);
 
 		LOG.info("Updating Kafka properties");
-		final Path kafkaPropertiesFile = kafkaDir.resolve(Paths.get("config", "server.properties"));
-		final List<String> kafkaPropertiesFileLines = Files.readAllLines(kafkaPropertiesFile);
-		try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(kafkaPropertiesFile, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
-			kafkaPropertiesFileLines.stream()
-				.map(line -> KAFKA_LOG_DIR_PATTERN.matcher(line).replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath()))
-				.forEachOrdered(pw::println);
-		}
+		FileUtils.replace(
+			kafkaDir.resolve(Paths.get("config", "server.properties")),
+			KAFKA_LOG_DIR_PATTERN,
+			matcher -> matcher.replaceAll("$1" + kafkaDir.resolve("kafka").toAbsolutePath())
+		);
 	}
 
 	private void setupKafkaCluster() throws IOException {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
new file mode 100644
index 0000000..acfa882
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/util/FileUtils.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.List;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Collection of file-related utilities.
+ */
+public class FileUtils {
+
+	/**
+	 * Matches the given {@link Pattern} against all lines in the given file, and replaces all matches with the replacement
+	 * generated by the given {@link Function}.
+	 * All unmatched lines and provided replacements are written into the file, with the order corresponding to the
+	 * original content. Newlines are automatically added to each line; this implies that an empty replacement string
+	 * will result in an empty line to be written.
+	 */
+	public static void replace(Path file, Pattern pattern, Function<Matcher, String> replacer) throws IOException {
+		final List<String> fileLines = Files.readAllLines(file);
+		try (PrintWriter pw = new PrintWriter(new OutputStreamWriter(Files.newOutputStream(file, StandardOpenOption.TRUNCATE_EXISTING), StandardCharsets.UTF_8.name()))) {
+			for (String fileLine : fileLines) {
+				Matcher matcher = pattern.matcher(fileLine);
+				if (matcher.matches()) {
+					String replacement = replacer.apply(matcher);
+					pw.println(replacement);
+				} else {
+					pw.println(fileLine);
+				}
+			}
+		}
+	}
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
new file mode 100644
index 0000000..43eadab
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/test/java/org/apache/flink/tests/util/util/FileUtilsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.util;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Tests for {@link FileUtils}.
+ */
+public class FileUtilsTest extends TestLogger {
+
+	@ClassRule
+	public static final TemporaryFolder TMP = new TemporaryFolder();
+
+	private static final List<String> ORIGINAL_LINES = Collections.unmodifiableList(Arrays.asList("line1", "line2", "line3"));
+	private Path testFile;
+
+	@Before
+	public void setupFile() throws IOException {
+		Path path = TMP.newFile().toPath();
+
+		Files.write(path, ORIGINAL_LINES);
+
+		testFile = path;
+	}
+
+	@Test
+	public void replaceSingleMatch() throws IOException {
+		FileUtils.replace(testFile, Pattern.compile("line1"), matcher -> "removed");
+
+		Assert.assertEquals(Arrays.asList("removed", ORIGINAL_LINES.get(1), ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+	}
+
+	@Test
+	public void replaceMultipleMatch() throws IOException {
+		FileUtils.replace(testFile, Pattern.compile("line(.*)"), matcher -> matcher.group(1));
+
+		Assert.assertEquals(Arrays.asList("1", "2", "3"), Files.readAllLines(testFile));
+	}
+
+	@Test
+	public void replaceWithEmptyLine() throws IOException {
+		FileUtils.replace(testFile, Pattern.compile("line2"), matcher -> "");
+
+		Assert.assertEquals(Arrays.asList(ORIGINAL_LINES.get(0), "", ORIGINAL_LINES.get(2)), Files.readAllLines(testFile));
+	}
+}