You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2020/06/17 07:49:37 UTC

[flink] branch release-1.11 updated (8192e1b -> a2f2975)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 8192e1b  Revert "[FLINK-18072][hbase] Fix HBaseLookupFunction can not work with new internal data structure RowData"
     new bb5a01a  [hotfix][e2e] Add 'flink' prefix to flink log backup directory
     new a2f2975  [FLINK-18301][e2e] Backup kafka logs on failure

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../util/kafka/LocalStandaloneKafkaResource.java   | 38 ++++++++++++++++++++--
 .../kafka/LocalStandaloneKafkaResourceFactory.java | 18 +++++++++-
 .../util/flink/LocalStandaloneFlinkResource.java   |  2 +-
 3 files changed, 53 insertions(+), 5 deletions(-)


[flink] 01/02: [hotfix][e2e] Add 'flink' prefix to flink log backup directory

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb5a01a081bc8eebd70470704037471cf8697307
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Jun 15 13:05:24 2020 +0200

    [hotfix][e2e] Add 'flink' prefix to flink log backup directory
---
 .../org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
index 6ea327e..d52e6dc 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/LocalStandaloneFlinkResource.java
@@ -112,7 +112,7 @@ public class LocalStandaloneFlinkResource implements FlinkResource {
 
 	private void backupLogs() {
 		if (logBackupDirectory != null) {
-			final Path targetDirectory = logBackupDirectory.resolve(UUID.randomUUID().toString());
+			final Path targetDirectory = logBackupDirectory.resolve("flink-" + UUID.randomUUID().toString());
 			try {
 				distribution.copyLogsTo(targetDirectory);
 				LOG.info("Backed up logs to {}.", targetDirectory);


[flink] 02/02: [FLINK-18301][e2e] Backup kafka logs on failure

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a2f2975578b53c2ae91a3fd66ce3a2761447a7fe
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Mon Jun 15 13:05:03 2020 +0200

    [FLINK-18301][e2e] Backup kafka logs on failure
---
 .../util/kafka/LocalStandaloneKafkaResource.java   | 38 ++++++++++++++++++++--
 .../kafka/LocalStandaloneKafkaResourceFactory.java | 18 +++++++++-
 2 files changed, 52 insertions(+), 4 deletions(-)

diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 7b0ecf0..e1a3ff4 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -21,6 +21,7 @@ package org.apache.flink.tests.util.kafka;
 import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.tests.util.AutoClosableProcess;
 import org.apache.flink.tests.util.CommandLineWrapper;
+import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.activation.OperatingSystemRestriction;
 import org.apache.flink.tests.util.cache.DownloadCache;
 import org.apache.flink.util.OperatingSystem;
@@ -29,6 +30,8 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
@@ -45,6 +48,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -72,12 +76,15 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 	private final DownloadCache downloadCache = DownloadCache.get();
 	private final String kafkaVersion;
 	private Path kafkaDir;
+	@Nullable
+	private Path logBackupDirectory;
 
-	LocalStandaloneKafkaResource(final String kafkaVersion) {
+	LocalStandaloneKafkaResource(final String kafkaVersion, @Nullable Path logBackupDirectory) {
 		OperatingSystemRestriction.forbid(
 			String.format("The %s relies on UNIX utils and shell scripts.", getClass().getSimpleName()),
 			OperatingSystem.WINDOWS);
 		this.kafkaVersion = kafkaVersion;
+		this.logBackupDirectory = logBackupDirectory;
 	}
 
 	private static String getKafkaDownloadUrl(final String kafkaVersion) {
@@ -162,6 +169,20 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 
 	@Override
 	public void afterTestSuccess() {
+		shutdownResource();
+		downloadCache.afterTestSuccess();
+		tmp.delete();
+	}
+
+	@Override
+	public void afterTestFailure() {
+		shutdownResource();
+		backupLogs();
+		downloadCache.afterTestFailure();
+		tmp.delete();
+	}
+
+	private void shutdownResource() {
 		try {
 			AutoClosableProcess.runBlocking(
 				kafkaDir.resolve(Paths.get("bin", "kafka-server-stop.sh")).toString()
@@ -192,8 +213,19 @@ public class LocalStandaloneKafkaResource implements KafkaResource {
 		} catch (IOException ioe) {
 			LOG.warn("Error while shutting down zookeeper.", ioe);
 		}
-		downloadCache.afterTestSuccess();
-		tmp.delete();
+	}
+
+	private void backupLogs() {
+		if (logBackupDirectory != null) {
+			final Path targetDirectory = logBackupDirectory.resolve("kafka-" + UUID.randomUUID().toString());
+			try {
+				Files.createDirectories(targetDirectory);
+				TestUtils.copyDirectory(kafkaDir.resolve("logs"), targetDirectory);
+				LOG.info("Backed up logs to {}.", targetDirectory);
+			} catch (IOException e) {
+				LOG.warn("An error has occurred while backing up logs to {}.", targetDirectory, e);
+			}
+		}
 	}
 
 	private static boolean isZookeeperRunning(final Path kafkaDir) {
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java
index 74ef5a5..073d26e 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResourceFactory.java
@@ -18,13 +18,29 @@
 
 package org.apache.flink.tests.util.kafka;
 
+import org.apache.flink.tests.util.parameters.ParameterProperty;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Optional;
+
 /**
  * A {@link KafkaResourceFactory} for the {@link LocalStandaloneKafkaResourceFactory}.
  */
 public final class LocalStandaloneKafkaResourceFactory implements KafkaResourceFactory {
+	private static final Logger LOG = LoggerFactory.getLogger(LocalStandaloneKafkaResourceFactory.class);
+
+	private static final ParameterProperty<Path> DISTRIBUTION_LOG_BACKUP_DIRECTORY = new ParameterProperty<>("logBackupDir", Paths::get);
 
 	@Override
 	public KafkaResource create(final String kafkaVersion) {
-		return new LocalStandaloneKafkaResource(kafkaVersion);
+		Optional<Path> logBackupDirectory = DISTRIBUTION_LOG_BACKUP_DIRECTORY.get();
+		if (!logBackupDirectory.isPresent()) {
+			LOG.warn("Property {} not set, logs will not be backed up in case of test failures.", DISTRIBUTION_LOG_BACKUP_DIRECTORY.getPropertyName());
+		}
+		return new LocalStandaloneKafkaResource(kafkaVersion, logBackupDirectory.orElse(null));
 	}
 }