You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:36 UTC

[05/48] oozie git commit: OOZIE-2591 fix TestWorkflowActionKillXCommand and refactor TestJavaActionExecutor

OOZIE-2591 fix TestWorkflowActionKillXCommand and refactor TestJavaActionExecutor

Change-Id: I556684dee7a04a931e6cf1b33de563b7ba4828b2


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/61f3a9f0
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/61f3a9f0
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/61f3a9f0

Branch: refs/heads/oya
Commit: 61f3a9f083b5085bbc575d7e9d251aec03bfcae4
Parents: 9e2acd0
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Thu Nov 10 12:27:08 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Thu Nov 10 16:06:34 2016 +0100

----------------------------------------------------------------------
 .../action/hadoop/TestJavaActionExecutor.java   | 33 +++++++---------
 .../wf/TestWorkflowActionKillXCommand.java      | 37 +++++++++---------
 .../java/org/apache/oozie/test/XTestCase.java   | 40 +++++++-------------
 .../oozie/action/hadoop/LauncherMain.java       |  9 ++++-
 4 files changed, 54 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 5d8bf34..bfc8ab4 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -27,7 +27,6 @@ import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.io.Writer;
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Collections;
@@ -35,10 +34,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -63,7 +60,6 @@ import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.WorkflowStoreService;
-import org.apache.oozie.service.UserGroupInformationService;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
@@ -100,7 +96,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
 
     }
 
-    @SuppressWarnings("unchecked")
     public void testSetupMethods() throws Exception {
         JavaActionExecutor ae = new JavaActionExecutor();
         assertEquals(Arrays.asList(JavaMain.class), ae.getLauncherClasses());
@@ -365,7 +360,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
                 "</java>";
         Context context = createContext(actionXml, null);
         submitAction(context);
-        waitUntilYarnAppState(context.getAction().getExternalId(), YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId());
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -385,7 +380,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
                 "</java>";
         Context context = createContext(actionXml, null);
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -410,7 +405,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
                 "</java>";
         Context context = createContext(actionXml, null);
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
         ActionExecutor ae = new JavaActionExecutor();
         try {
             ae.check(context, context.getAction());
@@ -441,7 +436,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         final String runningJobId = submitAction(context);
         ActionExecutor ae = new JavaActionExecutor();
         assertFalse(ae.isCompleted(context.getAction().getExternalStatus()));
-        waitUntilYarnAppState(runningJobId, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJobId);
         ae.check(context, context.getAction());
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
@@ -460,7 +455,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
 
         Context context = createContext(actionXml, null);
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
         assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
@@ -481,7 +476,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
 
         Context context = createContext(actionXml, null);
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
       //FIXME  assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
@@ -504,7 +499,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
 
         Context context = createContext(actionXml, null);
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
      //FIXME   assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
@@ -526,7 +521,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
 
         Context context = createContext(actionXml, null);
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
       //FIXME  assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob));
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
@@ -551,7 +546,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus());
         assertEquals("KILLED", context.getAction().getExternalStatus());
         assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
-        waitUntilYarnAppState(runningJob, YarnApplicationState.KILLED);
+        waitUntilYarnAppKilledAndAssertSuccess(runningJob);
     }
 
 
@@ -827,7 +822,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
                 "</java>";
         Context context = createContext(actionXml, null);
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -1876,13 +1871,13 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
 
         // Test when server side setting is not enabled
         JobConf launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, actionConf);
-        assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED));
+        assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED)); // disabled by default
 
         ConfigurationService.set("oozie.action.launcher." + JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED, "true");
 
         // Test when server side setting is enabled but tez-site.xml is not in DistributedCache
         launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, actionConf);
-        assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED));
+        assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_TIMELINE_SERVICE_ENABLED));
 
         final Path tezSite = new Path("/tmp/tez-site.xml");
         final FSDataOutputStream out = getFileSystem().create(tezSite);
@@ -2202,7 +2197,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         ConfigurationService.set("oozie.action.sharelib.for.java", "java");
 
         final String runningJob = submitAction(context);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
     }
 
     public void testJobSubmissionWithoutYarnKill() throws Exception {
@@ -2236,7 +2231,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         ConfigurationService.setBoolean(JavaActionExecutor.HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART, false);
 
         final String runningJob = submitAction(context, ae);
-        waitUntilYarnAppState(runningJob, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(runningJob);
     }
 
     public void testDefaultConfigurationInLauncher() throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index 72f0114..71b46d1 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -18,19 +18,18 @@
 
 package org.apache.oozie.command.wf;
 
-import java.io.StringReader;
 import java.net.URI;
 import java.util.Date;
+import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.hadoop.LauncherMain;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
@@ -42,11 +41,10 @@ import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.test.XTestCase.Predicate;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.workflow.WorkflowInstance;
 
+import com.google.common.collect.Sets;
+
 public class TestWorkflowActionKillXCommand extends XDataTestCase {
     private Services services;
 
@@ -117,6 +115,7 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
         assertEquals(action.getExternalStatus(), "RUNNING");
     }
 
+    // FIXME - fix JAE.kill()
     public void testWfActionKillChildJob() throws Exception {
         String externalJobID = launchSleepJob(1000);
         String childId = launchSleepJob(1000000);
@@ -126,15 +125,8 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
                 WorkflowAction.Status.KILLED, childId);
 
         new ActionKillXCommand(action.getId()).call();
-        JobClient jobClient = createJobClient();
 
-        final RunningJob mrJob = jobClient.getJob(JobID.forName(childId));
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return mrJob.isComplete();
-            }
-        });
-        assertEquals(mrJob.getJobState(), JobStatus.KILLED);
+        waitUntilYarnAppKilledAndAssertSuccess(childId);
     }
 
     protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName,
@@ -189,9 +181,18 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
         SleepJob sleepjob = new SleepJob();
         sleepjob.setConf(jobConf);
         jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1);
+        jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob");
+        jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob");
+        System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
+
+        jobClient.submitJob(jobConf);
+        Set<ApplicationId> apps = Sets.newHashSet();
+        apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL);
+        assertEquals("Number of YARN apps", apps.size(), 1);
+
+        sleepjob.close();
 
-        final RunningJob runningJob = jobClient.submitJob(jobConf);
-        return runningJob.getID().toString();
+        return apps.iterator().next().toString();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index f0c79b6..fd6d4ad 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -35,7 +35,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -44,10 +43,8 @@ import javax.persistence.FlushModeType;
 import javax.persistence.Query;
 
 import junit.framework.TestCase;
-import net.sf.ehcache.store.compound.ImmutableValueElementCopyStrategy;
 
 import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -92,7 +89,6 @@ import org.apache.oozie.sla.SLARegistrationBean;
 import org.apache.oozie.sla.SLASummaryBean;
 import org.apache.oozie.store.StoreException;
 import org.apache.oozie.test.MiniHCatServer.RUNMODE;
-import org.apache.oozie.test.XTestCase.Predicate;
 import org.apache.oozie.test.hive.MiniHS2;
 import org.apache.oozie.util.ClasspathUtils;
 import org.apache.oozie.util.IOUtils;
@@ -100,11 +96,6 @@ import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
 
-import com.google.common.base.Enums;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 /**
  * Base JUnit <code>TestCase</code> subclass used by all Oozie testcases.
  * <p/>
@@ -122,6 +113,7 @@ import com.google.common.collect.Sets;
  * From within testcases, system properties must be changed using the {@link #setSystemProperty} method.
  */
 public abstract class XTestCase extends TestCase {
+    private static EnumSet<YarnApplicationState> YARN_TERMINAL_STATES = EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED);
     private Map<String, String> sysProps;
     private String testCaseDir;
     private String testCaseConfDir;
@@ -1235,48 +1227,44 @@ public abstract class XTestCase extends TestCase {
         return services;
     }
 
-    protected YarnApplicationState waitUntilYarnAppState(String externalId, final YarnApplicationState... acceptedStates)
+    protected YarnApplicationState waitUntilYarnAppState(String externalId, final EnumSet<YarnApplicationState> acceptedStates)
             throws HadoopAccessorException, IOException, YarnException {
         final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
-        final Set<YarnApplicationState> states = Sets.immutableEnumSet(Lists.newArrayList(acceptedStates));
-        final MutableBoolean endStateOK = new MutableBoolean(false);
         final MutableObject<YarnApplicationState> finalState = new MutableObject<YarnApplicationState>();
 
         JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
-        // This is needed here because we need a mutable final YarnClient
-        final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null);
+        final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf);
+
         try {
-            yarnClientMO.setValue(Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf));
             waitFor(60 * 1000, new Predicate() {
                 @Override
                 public boolean evaluate() throws Exception {
-                     YarnApplicationState state = yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState();
+                     YarnApplicationState state = yarnClient.getApplicationReport(appId).getYarnApplicationState();
                      finalState.setValue(state);
 
-                     if (states.contains(state)) {
-                         endStateOK.setValue(true);
-                         return true;
-                     } else {
-                         return false;
-                     }
+                     return acceptedStates.contains(state);
                 }
             });
         } finally {
-            if (yarnClientMO.getValue() != null) {
-                yarnClientMO.getValue().close();
+            if (yarnClient != null) {
+                yarnClient.close();
             }
         }
 
         log.info("Final state is: {0}", finalState.getValue());
-        assertTrue(endStateOK.isTrue());
         return finalState.getValue();
     }
 
     protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException {
-        YarnApplicationState state = waitUntilYarnAppState(externalId, YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED);
+        YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
         assertEquals("YARN App state", YarnApplicationState.FINISHED, state);
     }
 
+    protected void waitUntilYarnAppKilledAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException {
+        YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES);
+        assertEquals("YARN App state", YarnApplicationState.KILLED, state);
+    }
+
     protected YarnApplicationState getYarnApplicationState(String externalId) throws HadoopAccessorException, IOException, YarnException {
         final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
         YarnApplicationState state = null;

http://git-wip-us.apache.org/repos/asf/oozie/blob/61f3a9f0/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index 338bce8..31200af 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -125,7 +125,11 @@ public abstract class LauncherMain {
         }
     }
 
-    private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
+    public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
+        return getChildYarnJobs(actionConf, ApplicationsRequestScope.OWN);
+    }
+
+    public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) {
         System.out.println("Fetching child yarn jobs");
         Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
         String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
@@ -142,8 +146,9 @@ public abstract class LauncherMain {
         }
 
         GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
-        gar.setScope(ApplicationsRequestScope.OWN);
+        gar.setScope(scope);
         gar.setApplicationTags(Collections.singleton(tag));
+
         long endTime = System.currentTimeMillis();
         if (startTime > endTime) {
             System.out.println("WARNING: Clock skew between the Oozie server host and this host detected.  Please fix this.  " +