You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:36 UTC
[05/48] oozie git commit: OOZIE-2591 fix
TestWorkflowActionKillXCommand and refactor TestJavaActionExecutor
OOZIE-2591 fix TestWorkflowActionKillXCommand and refactor TestJavaActionExecutor
Change-Id: I556684dee7a04a931e6cf1b33de563b7ba4828b2
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61f3a9f0
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61f3a9f0
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61f3a9f0
Branch: refs/heads/oya
Commit: 61f3a9f083b5085bbc575d7e9d251aec03bfcae4
Parents: 9e2acd0
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Thu Nov 10 12:27:08 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Thu Nov 10 16:06:34 2016 +0100
----------------------------------------------------------------------
.../action/hadoop/TestJavaActionExecutor.java | 33 +++++++---------
.../wf/TestWorkflowActionKillXCommand.java | 37 +++++++++---------
.../java/org/apache/oozie/test/XTestCase.java | 40 +++++++-------------
.../oozie/action/hadoop/LauncherMain.java | 9 ++++-
4 files changed, 54 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 5d8bf34..bfc8ab4 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -27,7 +27,6 @@ import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.io.Writer;
import java.net.URI;
-import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
@@ -35,10 +34,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -63,7 +60,6 @@ import org.apache.oozie.service.ShareLibService;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
-import org.apache.oozie.service.UserGroupInformationService;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
@@ -100,7 +96,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
}
- @SuppressWarnings("unchecked")
public void testSetupMethods() throws Exception {
JavaActionExecutor ae = new JavaActionExecutor();
assertEquals(Arrays.asList(JavaMain.class), ae.getLauncherClasses());
@@ -365,7 +360,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
"</java>";
Context context = createContext(actionXml, null);
submitAction(context);
- waitUntilYarnAppState(context.getAction().getExternalId(), YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId());
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -385,7 +380,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
"</java>";
Context context = createContext(actionXml, null);
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -410,7 +405,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
"</java>";
Context context = createContext(actionXml, null);
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
ActionExecutor ae = new JavaActionExecutor();
try {
ae.check(context, context.getAction());
@@ -441,7 +436,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
final String runningJobId = submitAction(context);
ActionExecutor ae = new JavaActionExecutor();
assertFalse(ae.isCompleted(context.getAction().getExternalStatus()));
- waitUntilYarnAppState(runningJobId, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJobId);
ae.check(context, context.getAction());
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
assertNull(context.getAction().getData());
@@ -460,7 +455,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
Context context = createContext(actionXml, null);
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
@@ -481,7 +476,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
Context context = createContext(actionXml, null);
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
//FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
@@ -504,7 +499,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
Context context = createContext(actionXml, null);
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
//FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
@@ -526,7 +521,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
Context context = createContext(actionXml, null);
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
//FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
@@ -551,7 +546,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus());
assertEquals("KILLED", context.getAction().getExternalStatus());
assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
- waitUntilYarnAppState(runningJob, YarnApplicationState.KILLED);
+ waitUntilYarnAppKilledAndAssertSuccess(runningJob);
}
@@ -827,7 +822,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
"</java>";
Context context = createContext(actionXml, null);
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
ActionExecutor ae = new JavaActionExecutor();
ae.check(context, context.getAction());
assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -1876,13 +1871,13 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
// Test when server side setting is not enabled
JobConf launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, actionConf);
- assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED));
+ assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED)); // disabled by default
ConfigurationService.set("oozie.action.launcher." + JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED, "true");
// Test when server side setting is enabled but tez-site.xml is not in DistributedCache
launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, actionConf);
- assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED));
+ assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED));
final Path tezSite = new Path("/tmp/tez-site.xml");
final FSDataOutputStream out = getFileSystem().create(tezSite);
@@ -2202,7 +2197,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
ConfigurationService.set("oozie.action.sharelib.for.java", "java");
final String runningJob = submitAction(context);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
}
public void testJobSubmissionWithoutYarnKill() throws Exception {
@@ -2236,7 +2231,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
ConfigurationService.setBoolean(JavaActionExecutor.HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART, false);
final String runningJob = submitAction(context, ae);
- waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+ waitUntilYarnAppDoneAndAssertSuccess(runningJob);
}
public void testDefaultConfigurationInLauncher() throws Exception {
http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index 72f0114..71b46d1 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -18,19 +18,18 @@
package org.apache.oozie.command.wf;
-import java.io.StringReader;
import java.net.URI;
import java.util.Date;
+import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.hadoop.LauncherMain;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
@@ -42,11 +41,10 @@ import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.test.XTestCase.Predicate;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.workflow.WorkflowInstance;
+import com.google.common.collect.Sets;
+
public class TestWorkflowActionKillXCommand extends XDataTestCase {
private Services services;
@@ -117,6 +115,7 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
assertEquals(action.getExternalStatus(), "RUNNING");
}
+ // FIXME - fix JAE.kill()
public void testWfActionKillChildJob() throws Exception {
String externalJobID = launchSleepJob(1000);
String childId = launchSleepJob(1000000);
@@ -126,15 +125,8 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
WorkflowAction.Status.KILLED, childId);
new ActionKillXCommand(action.getId()).call();
- JobClient jobClient = createJobClient();
- final RunningJob mrJob = jobClient.getJob(JobID.forName(childId));
- waitFor(60 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return mrJob.isComplete();
- }
- });
- assertEquals(mrJob.getJobState(), JobStatus.KILLED);
+ waitUntilYarnAppKilledAndAssertSuccess(childId);
}
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName,
@@ -189,9 +181,18 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
SleepJob sleepjob = new SleepJob();
sleepjob.setConf(jobConf);
jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1);
+ jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob");
+ jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob");
+ System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
+
+ jobClient.submitJob(jobConf);
+ Set<ApplicationId> apps = Sets.newHashSet();
+ apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL);
+ assertEquals("Number of YARN apps", apps.size(), 1);
+
+ sleepjob.close();
- final RunningJob runningJob = jobClient.submitJob(jobConf);
- return runningJob.getID().toString();
+ return apps.iterator().next().toString();
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index f0c79b6..fd6d4ad 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -44,10 +43,8 @@ import javax.persistence.FlushModeType;
import javax.persistence.Query;
import junit.framework.TestCase;
-import net.sf.ehcache.store.compound.ImmutableValueElementCopyStrategy;
import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -92,7 +89,6 @@ import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.sla.SLASummaryBean;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.test.MiniHCatServer.RUNMODE;
-import org.apache.oozie.test.XTestCase.Predicate;
import org.apache.oozie.test.hive.MiniHS2;
import org.apache.oozie.util.ClasspathUtils;
import org.apache.oozie.util.IOUtils;
@@ -100,11 +96,6 @@ import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
-import com.google.common.base.Enums;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
/**
* Base JUnit <code>TestCase</code> subclass used by all Oozie testcases.
* <p/>
@@ -122,6 +113,7 @@ import com.google.common.collect.Sets;
* From within testcases, system properties must be changed using the {@link #setSystemProperty} method.
*/
public abstract class XTestCase extends TestCase {
+ private static EnumSet<YarnApplicationState> YARN_TERMINAL_STATES = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED);
private Map<String, String> sysProps;
private String testCaseDir;
private String testCaseConfDir;
@@ -1235,48 +1227,44 @@ public abstract class XTestCase extends TestCase {
return services;
}
- protected YarnApplicationState waitUntilYarnAppState(String externalId, final YarnApplicationState... acceptedStates)
+ protected YarnApplicationState waitUntilYarnAppState(String externalId, final EnumSet<YarnApplicationState> acceptedStates)
throws HadoopAccessorException, IOException, YarnException {
final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
- final Set<YarnApplicationState> states = Sets.immutableEnumSet(Lists.newArrayList(acceptedStates));
- final MutableBoolean endStateOK = new MutableBoolean(false);
final MutableObject<YarnApplicationState> finalState = new MutableObject<YarnApplicationState>();
JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
- // This is needed here because we need a mutable final YarnClient
- final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null);
+ final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf);
+
try {
- yarnClientMO.setValue(Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf));
waitFor(60 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
- YarnApplicationState state = yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState();
+ YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState();
finalState.setValue(state);
- if (states.contains(state)) {
- endStateOK.setValue(true);
- return true;
- } else {
- return false;
- }
+ return acceptedStates.contains(state);
}
});
} finally {
- if (yarnClientMO.getValue() != null) {
- yarnClientMO.getValue().close();
+ if (yarnClient != null) {
+ yarnClient.close();
}
}
log.info("Final state is: {0}", finalState.getValue());
- assertTrue(endStateOK.isTrue());
return finalState.getValue();
}
protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException {
- YarnApplicationState state = waitUntilYarnAppState(externalId, YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED);
+ YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
assertEquals("YARN App state", YarnApplicationState.FINISHED, state);
}
+ protected void waitUntilYarnAppKilledAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException {
+ YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
+ assertEquals("YARN App state", YarnApplicationState.KILLED, state);
+ }
+
protected YarnApplicationState getYarnApplicationState(String externalId) throws HadoopAccessorException, IOException, YarnException {
final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
YarnApplicationState state = null;
http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index 338bce8..31200af 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -125,7 +125,11 @@ public abstract class LauncherMain {
}
}
- private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
+ public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
+ return getChildYarnJobs(actionConf, ApplicationsRequestScope.OWN);
+ }
+
+ public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) {
System.out.println("Fetching child yarn jobs");
Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
@@ -142,8 +146,9 @@ public abstract class LauncherMain {
}
GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
- gar.setScope(ApplicationsRequestScope.OWN);
+ gar.setScope(scope);
gar.setApplicationTags(Collections.singleton(tag));
+
long endTime = System.currentTimeMillis();
if (startTime > endTime) {
System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " +