You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/23 13:25:19 UTC

[flink] branch master updated: [FLINK-11355][tests] Remove JobManagerProcessReapingTest

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c4df99  [FLINK-11355][tests] Remove JobManagerProcessReapingTest
1c4df99 is described below

commit 1c4df9954e2ba103f09c4160ee918b625c8dc10d
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Fri Jan 18 15:12:51 2019 +0100

    [FLINK-11355][tests] Remove JobManagerProcessReapingTest
    
    The JobManagerProcessReapingTest is no longer relevant since we don't start a
    ProcessReaper as it used to be the case in the legacy mode.
---
 .../jobmanager/JobManagerProcessReapingTest.java   | 246 ---------------------
 1 file changed, 246 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
deleted file mode 100644
index fc16483..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerProcessReapingTest.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import static org.junit.Assert.*;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClasspath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
-import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-
-import org.apache.flink.configuration.WebOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.jobmaster.JobMaster;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import org.apache.flink.configuration.Configuration;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.duration.FiniteDuration;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Tests that the JobManager process properly exits when the JobManager actor dies.
- */
-public class JobManagerProcessReapingTest extends TestLogger {
-
-	@Test
-	public void testReapProcessOnFailure() {
-		Process jmProcess = null;
-		ActorSystem localSystem = null;
-
-		final StringWriter processOutput = new StringWriter();
-
-		try {
-			String javaCommand = getJavaCommandPath();
-
-			// check that we run this test only if the java command
-			// is available on this machine
-			if (javaCommand == null) {
-				System.out.println("---- Skipping JobManagerProcessReapingTest : Could not find java executable ----");
-				return;
-			}
-
-			// create a logging file for the process
-			File tempLogFile = File.createTempFile("testlogconfig", "properties");
-			tempLogFile.deleteOnExit();
-			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
-
-			// start a JobManger process
-			// the log level must be at least INFO, otherwise the bound port cannot be retrieved
-			String[] command = new String[] {
-					javaCommand,
-					"-Dlog.level=DEBUG",
-					"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
-					"-Xms256m", "-Xmx256m",
-					"-classpath", getCurrentClasspath(),
-					JobManagerTestEntryPoint.class.getName()
-			};
-
-			// spawn the process and collect its output
-			ProcessBuilder bld = new ProcessBuilder(command);
-			jmProcess = bld.start();
-			new PipeForwarder(jmProcess.getErrorStream(), processOutput);
-
-			// start another actor system so we can send something to the JobManager
-			Tuple2<String, Object> localAddress = new Tuple2<String, Object>("localhost", 0);
-			localSystem = AkkaUtils.createActorSystem(
-					new Configuration(), new Some<Tuple2<String, Object>>(localAddress));
-
-			// grab the reference to the JobManager. try multiple times, until the process
-			// is started and the JobManager is up
-			ActorRef jobManagerRef = null;
-			Throwable lastError = null;
-
-			// Log message on JobManager must be: Starting JobManager at ...://flink@...:port/..."
-			// otherwise, the pattern does not match and, thus, cannot retrieve the bound port
-			String pattern = "Starting JobManager at [^:]*://flink@[^:]*:(\\d*)/";
-			Pattern r = Pattern.compile(pattern);
-			int jobManagerPort = -1;
-
-			for (int i = 0; i < 40; i++) {
-				Matcher m = r.matcher(processOutput.toString());
-
-				if (m.find()) {
-					jobManagerPort = Integer.parseInt(m.group(1));
-					break;
-				}
-
-				Thread.sleep(500);
-			}
-
-			if (jobManagerPort != -1) {
-				try {
-					final String jobManagerAkkaUrl = AkkaRpcServiceUtils.getRpcUrl(
-						"localhost",
-						jobManagerPort,
-						JobMaster.JOB_MANAGER_NAME,
-						HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION,
-						AkkaRpcServiceUtils.AkkaProtocol.TCP);
-
-					jobManagerRef = AkkaUtils.getActorRef(jobManagerAkkaUrl, localSystem, new FiniteDuration(25L, TimeUnit.SECONDS));
-				} catch (Throwable t) {
-					// job manager probably not ready yet
-					lastError = t;
-				}
-			} else {
-				fail("Could not determine port of started JobManager.");
-			}
-
-			assertTrue("JobManager process died", isProcessAlive(jmProcess));
-
-			if (jobManagerRef == null) {
-				if (lastError != null) {
-					lastError.printStackTrace();
-				}
-				fail("JobManager process did not launch the JobManager properly. Failed to look up JobManager actor at"
-						+ " localhost:" + jobManagerPort);
-			}
-
-			// kill the JobManager actor
-			jobManagerRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
-
-			// wait for max 5 seconds for the process to terminate
-			{
-				long now = System.currentTimeMillis();
-				long deadline = now + 5000;
-
-				while (now < deadline && isProcessAlive(jmProcess)) {
-					Thread.sleep(100);
-					now = System.currentTimeMillis();
-				}
-			}
-
-			assertFalse("JobManager process did not terminate upon actor death", isProcessAlive(jmProcess));
-
-			int returnCode = jmProcess.exitValue();
-			assertEquals("JobManager died, but not because of the process reaper",
-					JobManager.RUNTIME_FAILURE_RETURN_CODE(), returnCode);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			printProcessLog(processOutput.toString());
-			fail(e.getMessage());
-		}
-		catch (Error e) {
-			e.printStackTrace();
-			printProcessLog(processOutput.toString());
-			throw e;
-		}
-		finally {
-			if (jmProcess != null) {
-				jmProcess.destroy();
-			}
-			if (localSystem != null) {
-				localSystem.terminate();
-			}
-		}
-	}
-
-	private static void printProcessLog(String log) {
-		System.out.println("-----------------------------------------");
-		System.out.println("       BEGIN SPAWNED PROCESS LOG");
-		System.out.println("-----------------------------------------");
-		System.out.println(log);
-		System.out.println("-----------------------------------------");
-		System.out.println("        END SPAWNED PROCESS LOG");
-		System.out.println("-----------------------------------------");
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public static class JobManagerTestEntryPoint {
-
-		public static void main(String[] args) {
-			try {
-				Configuration config = new Configuration();
-				config.setInteger(WebOptions.PORT, -1);
-
-				JobManager.runJobManager(config, JobManagerMode.CLUSTER, "localhost", 0);
-				System.exit(0);
-			}
-			catch (Throwable t) {
-				System.exit(1);
-			}
-		}
-	}
-
-	private static class PipeForwarder extends Thread {
-
-		private final StringWriter target;
-		private final InputStream source;
-
-		public PipeForwarder(InputStream source, StringWriter target) {
-			super("Pipe Forwarder");
-			setDaemon(true);
-
-			this.source = source;
-			this.target = target;
-
-			start();
-		}
-
-		@Override
-		public void run() {
-			try {
-				int next;
-				while ((next = source.read()) != -1) {
-					target.write(next);
-				}
-			}
-			catch (IOException e) {
-				// terminate
-			}
-		}
-	}
-}