You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/16 14:49:21 UTC

[1/3] flink git commit: [FLINK-9839][tests] Add support for SSL to e2e-tests

Repository: flink
Updated Branches:
  refs/heads/master 010f66547 -> e2e090b1a


[FLINK-9839][tests] Add support for SSL to e2e-tests

This closes #6327.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2e090b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2e090b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2e090b1

Branch: refs/heads/master
Commit: e2e090b1a105f9bd20b6e6d0d354fefd5ab0fce9
Parents: 0a5aebb
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Jul 16 13:06:36 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 16 14:09:42 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/test-scripts/common.sh   | 48 +++++++++++++++++++-
 .../test-scripts/test_batch_allround.sh         |  2 +
 .../test-scripts/test_streaming_bucketing.sh    |  2 +
 3 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2e090b1/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index 941e6d1..c78afe7 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -36,6 +36,7 @@ export EXIT_CODE=0
 
 echo "Flink dist directory: $FLINK_DIR"
 
+USE_SSL=OFF # set via set_conf_ssl(), reset via revert_default_config()
 TEST_ROOT=`pwd`
 TEST_INFRA_DIR="$END_TO_END_DIR/test-scripts/"
 cd $TEST_INFRA_DIR
@@ -80,6 +81,8 @@ function revert_default_config() {
     if [ -f $FLINK_DIR/conf/flink-conf.yaml.bak ]; then
         mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
     fi
+
+    USE_SSL=OFF
 }
 
 function set_conf() {
@@ -143,6 +146,43 @@ function create_ha_config() {
 EOL
 }
 
+function set_conf_ssl {
+
+    # clean up the dir that will be used for SSL certificates and trust stores
+    if [ -e "${TEST_DATA_DIR}/ssl" ]; then
+       echo "File ${TEST_DATA_DIR}/ssl exists. Deleting it..."
+       rm -rf "${TEST_DATA_DIR}/ssl"
+    fi
+    mkdir -p "${TEST_DATA_DIR}/ssl"
+    NODENAME=`hostname -f`
+    SANSTRING="dns:${NODENAME}"
+    for NODEIP in `hostname -I | cut -d' ' -f1` ; do
+        SANSTRING="${SANSTRING},ip:${NODEIP}"
+    done
+
+    # create certificates
+    keytool -genkeypair -alias ca -keystore "${TEST_DATA_DIR}/ssl/ca.keystore" -dname "CN=Sample CA" -storepass password -keypass password -keyalg RSA -ext bc=ca:true
+    keytool -keystore "${TEST_DATA_DIR}/ssl/ca.keystore" -storepass password -alias ca -exportcert > "${TEST_DATA_DIR}/ssl/ca.cer"
+    keytool -importcert -keystore "${TEST_DATA_DIR}/ssl/ca.truststore" -alias ca -storepass password -noprompt -file "${TEST_DATA_DIR}/ssl/ca.cer"
+
+    keytool -genkeypair -alias node -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -dname "CN=${NODENAME}" -ext SAN=${SANSTRING} -storepass password -keypass password -keyalg RSA
+    keytool -certreq -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -storepass password -alias node -file "${TEST_DATA_DIR}/ssl/node.csr"
+    keytool -gencert -keystore "${TEST_DATA_DIR}/ssl/ca.keystore" -storepass password -alias ca -ext SAN=${SANSTRING} -infile "${TEST_DATA_DIR}/ssl/node.csr" -outfile "${TEST_DATA_DIR}/ssl/node.cer"
+    keytool -importcert -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -storepass password -file "${TEST_DATA_DIR}/ssl/ca.cer" -alias ca -noprompt
+    keytool -importcert -keystore "${TEST_DATA_DIR}/ssl/node.keystore" -storepass password -file "${TEST_DATA_DIR}/ssl/node.cer" -alias node -noprompt
+
+    # adapt config
+    # (here we rely on security.ssl.enabled enabling SSL for all components and internal as well as
+    # external communication channels)
+    set_conf security.ssl.enabled true
+    set_conf security.ssl.keystore ${TEST_DATA_DIR}/ssl/node.keystore
+    set_conf security.ssl.keystore-password password
+    set_conf security.ssl.key-password password
+    set_conf security.ssl.truststore ${TEST_DATA_DIR}/ssl/ca.truststore
+    set_conf security.ssl.truststore-password password
+    USE_SSL=ON
+}
+
 function start_ha_cluster {
     create_ha_config
     start_local_zk
@@ -178,9 +218,15 @@ function start_cluster {
   "$FLINK_DIR"/bin/start-cluster.sh
 
   # wait at most 10 seconds until the dispatcher is up
+  local QUERY_URL
+  if [ "x$USE_SSL" = "xON" ]; then
+    QUERY_URL="http://localhost:8081/taskmanagers"
+  else
+    QUERY_URL="https://localhost:8081/taskmanagers"
+  fi
   for i in {1..10}; do
     # without the || true this would exit our script if the JobManager is not yet up
-    QUERY_RESULT=$(curl "http://localhost:8081/taskmanagers" 2> /dev/null || true)
+    QUERY_RESULT=$(curl "$QUERY_URL" 2> /dev/null || true)
 
     # ensure the taskmanagers field is there at all and is not empty
     if [[ ${QUERY_RESULT} =~ \{\"taskmanagers\":\[.+\]\} ]]; then

http://git-wip-us.apache.org/repos/asf/flink/blob/e2e090b1/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index 1cb4484..31c20f5 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -28,6 +28,8 @@ cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
 echo "taskmanager.network.memory.min: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml
 echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml
 
+backup_config
+set_conf_ssl
 start_cluster
 $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start

http://git-wip-us.apache.org/repos/asf/flink/blob/e2e090b1/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
index 19da468..0e12d39 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_bucketing.sh
@@ -24,6 +24,8 @@ TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-bucketing-sink-test/target/BucketingSin
 # enable DEBUG logging level to retrieve truncate length later
 sed -i -e 's/#log4j.logger.org.apache.flink=INFO/log4j.logger.org.apache.flink=DEBUG/g' $FLINK_DIR/conf/log4j.properties
 
+backup_config
+set_conf_ssl
 start_cluster
 $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start


[2/3] flink git commit: [FLINK-9842][rest] Pass actual configuration to BlobClient

Posted by ch...@apache.org.
[FLINK-9842][rest] Pass actual configuration to BlobClient

This closes #6340.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0a5aebb0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0a5aebb0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0a5aebb0

Branch: refs/heads/master
Commit: 0a5aebb0149d3660e549446a3d46df34ef1fb4d2
Parents: 0777753
Author: zentol <ch...@apache.org>
Authored: Mon Jul 16 11:32:09 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 16 14:09:42 2018 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  3 +-
 .../rest/handler/job/JobSubmitHandler.java      | 12 +++--
 .../rest/handler/job/JobSubmitHandlerTest.java  | 49 ++++++++++++++------
 3 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0a5aebb0/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 4279330..ba080c6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -92,7 +92,8 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway
 			leaderRetriever,
 			timeout,
 			responseHeaders,
-			executor);
+			executor,
+			clusterConfiguration);
 
 		if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/0a5aebb0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index 052b056..0854ed7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -62,15 +62,18 @@ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGatewa
 	private static final String FILE_TYPE_ARTIFACT = "Artifact";
 
 	private final Executor executor;
+	private final Configuration configuration;
 
 	public JobSubmitHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> headers,
-			Executor executor) {
+			Executor executor,
+			Configuration configuration) {
 		super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance());
 		this.executor = executor;
+		this.configuration = configuration;
 	}
 
 	@Override
@@ -99,7 +102,7 @@ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGatewa
 
 		Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
 
-		CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts);
+		CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
 
 		CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
 
@@ -151,13 +154,14 @@ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGatewa
 			DispatcherGateway gateway,
 			CompletableFuture<JobGraph> jobGraphFuture,
 			Collection<Path> jarFiles,
-			Collection<Tuple2<String, Path>> artifacts) {
+			Collection<Tuple2<String, Path>> artifacts,
+			Configuration configuration) {
 		CompletableFuture<Integer> blobServerPortFuture = gateway.getBlobServerPort(timeout);
 
 		return jobGraphFuture.thenCombine(blobServerPortFuture, (JobGraph jobGraph, Integer blobServerPort) -> {
 			final InetSocketAddress address = new InetSocketAddress(gateway.getHostname(), blobServerPort);
 			try {
-				ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, new Configuration()));
+				ClientUtils.uploadJobGraphFiles(jobGraph, jarFiles, artifacts, () -> new BlobClient(address, configuration));
 			} catch (FlinkException e) {
 				throw new CompletionException(new RestHandlerException(
 					"Could not upload job files.",

http://git-wip-us.apache.org/repos/asf/flink/blob/0a5aebb0/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index 0003829..be1cb79 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.net.SSLUtilsTest;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -39,12 +40,14 @@ import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
@@ -57,15 +60,30 @@ import java.util.concurrent.CompletableFuture;
 /**
  * Tests for the {@link JobSubmitHandler}.
  */
+@RunWith(Parameterized.class)
 public class JobSubmitHandlerTest extends TestLogger {
 
+	@Parameterized.Parameters(name = "SSL enabled: {0}")
+	public static Iterable<Boolean> data() {
+		return Arrays.asList(true, false);
+	}
+
 	@ClassRule
 	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
-	private static BlobServer blobServer;
 
-	@BeforeClass
-	public static void setup() throws IOException {
-		Configuration config = new Configuration();
+	private final Configuration configuration;
+
+	private BlobServer blobServer;
+
+	public JobSubmitHandlerTest(boolean withSsl) {
+		this.configuration = withSsl
+			? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores()
+			: new Configuration();
+	}
+
+	@Before
+	public void setup() throws IOException {
+		Configuration config = new Configuration(configuration);
 		config.setString(BlobServerOptions.STORAGE_DIRECTORY,
 			TEMPORARY_FOLDER.newFolder().getAbsolutePath());
 
@@ -73,8 +91,8 @@ public class JobSubmitHandlerTest extends TestLogger {
 		blobServer.start();
 	}
 
-	@AfterClass
-	public static void teardown() throws IOException {
+	@After
+	public void teardown() throws IOException {
 		if (blobServer != null) {
 			blobServer.close();
 		}
@@ -92,7 +110,8 @@ public class JobSubmitHandlerTest extends TestLogger {
 			() -> CompletableFuture.completedFuture(mockGateway),
 			RpcUtils.INF_TIMEOUT,
 			Collections.emptyMap(),
-			TestingUtils.defaultExecutor());
+			TestingUtils.defaultExecutor(),
+			configuration);
 
 		JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.toString(), Collections.emptyList(), Collections.emptyList());
 
@@ -123,7 +142,8 @@ public class JobSubmitHandlerTest extends TestLogger {
 			() -> CompletableFuture.completedFuture(mockGateway),
 			RpcUtils.INF_TIMEOUT,
 			Collections.emptyMap(),
-			TestingUtils.defaultExecutor());
+			TestingUtils.defaultExecutor(),
+			configuration);
 
 		JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
 
@@ -151,7 +171,8 @@ public class JobSubmitHandlerTest extends TestLogger {
 			() -> CompletableFuture.completedFuture(mockGateway),
 			RpcUtils.INF_TIMEOUT,
 			Collections.emptyMap(),
-			TestingUtils.defaultExecutor());
+			TestingUtils.defaultExecutor(),
+			configuration);
 
 		JobSubmitRequestBody request = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(), Collections.emptyList(), Collections.emptyList());
 
@@ -181,7 +202,8 @@ public class JobSubmitHandlerTest extends TestLogger {
 			() -> CompletableFuture.completedFuture(dispatcherGateway),
 			RpcUtils.INF_TIMEOUT,
 			Collections.emptyMap(),
-			TestingUtils.defaultExecutor());
+			TestingUtils.defaultExecutor(),
+			configuration);
 
 		final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
 		final Path jarFile = TEMPORARY_FOLDER.newFile().toPath();
@@ -226,7 +248,8 @@ public class JobSubmitHandlerTest extends TestLogger {
 			() -> CompletableFuture.completedFuture(mockGateway),
 			RpcUtils.INF_TIMEOUT,
 			Collections.emptyMap(),
-			TestingUtils.defaultExecutor());
+			TestingUtils.defaultExecutor(),
+			configuration);
 
 		final Path jobGraphFile = TEMPORARY_FOLDER.newFile().toPath();
 


[3/3] flink git commit: [FLINK-9380][tests] Do not delete logs on E2E test failure

Posted by ch...@apache.org.
[FLINK-9380][tests] Do not delete logs on E2E test failure

This closes #6289.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07777536
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07777536
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07777536

Branch: refs/heads/master
Commit: 0777753614ff2fad2847ba7e7c801a706eb06bed
Parents: 010f665
Author: Deepak Sharnma <de...@gmail.com>
Authored: Mon Jul 9 15:37:58 2018 -0400
Committer: zentol <ch...@apache.org>
Committed: Mon Jul 16 14:09:42 2018 +0200

----------------------------------------------------------------------
 flink-end-to-end-tests/test-scripts/common.sh   | 15 ++++---
 .../test-scripts/test-runner-common.sh          | 41 ++++++++++++++------
 2 files changed, 38 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/07777536/flink-end-to-end-tests/test-scripts/common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh
index e5ad458..941e6d1 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -37,16 +37,11 @@ export EXIT_CODE=0
 echo "Flink dist directory: $FLINK_DIR"
 
 TEST_ROOT=`pwd`
-TEST_INFRA_DIR="$0"
-TEST_INFRA_DIR=`dirname "$TEST_INFRA_DIR"`
+TEST_INFRA_DIR="$END_TO_END_DIR/test-scripts/"
 cd $TEST_INFRA_DIR
 TEST_INFRA_DIR=`pwd`
 cd $TEST_ROOT
 
-# used to randomize created directories
-export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
-echo "TEST_DATA_DIR: $TEST_DATA_DIR"
-
 function print_mem_use_osx {
     declare -a mem_types=("active" "inactive" "wired down")
     used=""
@@ -280,6 +275,12 @@ function check_logs_for_non_empty_out_files {
   fi
 }
 
+function shutdown_all {
+  stop_cluster
+  tm_kill_all
+  jm_kill_all
+}
+
 function stop_cluster {
   "$FLINK_DIR"/bin/stop-cluster.sh
 
@@ -536,10 +537,12 @@ function end_timer {
 
 function clean_stdout_files {
     rm ${FLINK_DIR}/log/*.out
+    echo "Deleted all stdout files under ${FLINK_DIR}/log/"
 }
 
 function clean_log_files {
     rm ${FLINK_DIR}/log/*
+    echo "Deleted all files under ${FLINK_DIR}/log/"
 }
 
 # Expect a string to appear in the log files of the task manager before a given timeout

http://git-wip-us.apache.org/repos/asf/flink/blob/07777536/flink-end-to-end-tests/test-scripts/test-runner-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test-runner-common.sh b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
index 8758da0..fee67a4 100644
--- a/flink-end-to-end-tests/test-scripts/test-runner-common.sh
+++ b/flink-end-to-end-tests/test-scripts/test-runner-common.sh
@@ -32,6 +32,11 @@ function run_test {
     printf "\n==============================================================================\n"
     printf "Running '${description}'\n"
     printf "==============================================================================\n"
+
+    # used to randomize created directories
+    export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
+    echo "TEST_DATA_DIR: $TEST_DATA_DIR"
+
     start_timer
     ${command}
     exit_code="$?"
@@ -41,8 +46,8 @@ function run_test {
     check_logs_for_exceptions
     check_logs_for_non_empty_out_files
 
-    cleanup
-
+    # Investigate exit_code for failures of test executable as well as EXIT_CODE for failures of the test.
+    # Do not clean up if either fails.
     if [[ ${exit_code} == 0 ]]; then
         if [[ ${EXIT_CODE} != 0 ]]; then
             printf "\n[FAIL] '${description}' failed after ${time_elapsed}! Test exited with exit code 0 but the logs contained errors, exceptions or non-empty .out files\n\n"
@@ -58,20 +63,32 @@ function run_test {
         fi
     fi
 
-    if [[ ${exit_code} != 0 ]]; then
+    if [[ ${exit_code} == 0 ]]; then
+        cleanup
+    else
         exit "${exit_code}"
     fi
 }
 
-# Shuts down the cluster and cleans up all temporary folders and files. Make sure to clean up even in case of failures.
+# Shuts down cluster and reverts changes to cluster configs
+function cleanup_proc {
+    shutdown_all
+    revert_default_config
+}
+
+# Cleans up all temporary folders and files
+function cleanup_tmp_files {
+    clean_log_files
+
+    rm -rf ${TEST_DATA_DIR} 2> /dev/null
+    echo "Deleted ${TEST_DATA_DIR}"
+}
+
+# Shuts down the cluster and cleans up all temporary folders and files.
 function cleanup {
-  stop_cluster
-  tm_kill_all
-  jm_kill_all
-  rm -rf $TEST_DATA_DIR 2> /dev/null
-  revert_default_config
-  clean_log_files
-  clean_stdout_files
+    cleanup_proc
+    cleanup_tmp_files
 }
 
-trap cleanup EXIT
+trap cleanup SIGINT
+trap cleanup_proc EXIT