You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 19:46:51 UTC

oozie git commit: OOZIE-2444 Need conditional logic in bundles

Repository: oozie
Updated Branches:
  refs/heads/master 70052969a -> 5abd3e6a5


OOZIE-2444 Need conditional logic in bundles


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5abd3e6a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5abd3e6a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5abd3e6a

Branch: refs/heads/master
Commit: 5abd3e6a598bf073b1b0aded878d746ccb482da0
Parents: 7005296
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Jan 26 10:46:45 2016 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Jan 26 10:46:45 2016 -0800

----------------------------------------------------------------------
 client/src/main/resources/oozie-bundle-0.2.xsd  |  1 +
 .../command/bundle/BundleStartXCommand.java     | 34 ++++++++
 .../command/bundle/TestBundleStartXCommand.java | 28 ++++++
 .../org/apache/oozie/test/XDataTestCase.java    | 91 ++++++++++++++++++++
 core/src/test/resources/bundle-submit-job.xml   |  2 +-
 docs/src/site/twiki/BundleFunctionalSpec.twiki  |  7 +-
 release-log.txt                                 |  1 +
 7 files changed, 161 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/client/src/main/resources/oozie-bundle-0.2.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/oozie-bundle-0.2.xsd b/client/src/main/resources/oozie-bundle-0.2.xsd
index b4e37e5..5668a92 100644
--- a/client/src/main/resources/oozie-bundle-0.2.xsd
+++ b/client/src/main/resources/oozie-bundle-0.2.xsd
@@ -58,6 +58,7 @@
         </xs:sequence>
         <xs:attribute name="name" type="xs:string" use="required"/>
         <xs:attribute name="critical" type="xs:string" use="optional"/>
+        <xs:attribute name="enabled" type="xs:string" use="optional"/>
     </xs:complexType>
     <xs:complexType name="CONFIGURATION">
         <xs:sequence>

http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
index e026efb..27ae4a4 100644
--- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStartXCommand.java
@@ -174,6 +174,11 @@ public class BundleStartXCommand extends StartTransitionXCommand {
                         if (map.containsKey(name.getValue())) {
                             throw new CommandException(ErrorCode.E1304, name);
                         }
+                        Configuration coordConf = mergeConfig(elem);
+                        // skip coord job if it is not enabled
+                        if (!isEnabled(elem, coordConf)) {
+                            continue;
+                        }
                         boolean isCritical = false;
                         if (critical != null && Boolean.parseBoolean(critical.getValue())) {
                             isCritical = true;
@@ -249,6 +254,10 @@ public class BundleStartXCommand extends StartTransitionXCommand {
                     if (OozieJobInfo.isJobInfoEnabled()) {
                         coordConf.set(OozieJobInfo.BUNDLE_NAME, bundleJob.getAppName());
                     }
+                    // skip coord job if it is not enabled
+                    if (!isEnabled(coordElem, coordConf)) {
+                        continue;
+                    }
                     String coordName=name.getValue();
                     try {
                         coordName = ELUtils.resolveAppName(coordName, coordConf);
@@ -350,4 +359,29 @@ public class BundleStartXCommand extends StartTransitionXCommand {
     public void updateJob() throws CommandException {
         updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob));
     }
+
+    /**
+     * Checks whether the coordinator is enabled
+     *
+     * @param coordElem
+     * @param coordConf
+     * @return true if coordinator is enabled, otherwise false.
+     * @throws CommandException
+     */
+    private boolean isEnabled(Element coordElem, Configuration coordConf) throws CommandException {
+        Attribute enabled = coordElem.getAttribute("enabled");
+        if (enabled == null) {
+            // default is true
+            return true;
+        }
+        String isEnabled = enabled.getValue();
+        try {
+            isEnabled = ELUtils.resolveAppName(isEnabled, coordConf);
+        }
+        catch (Exception e) {
+            throw new CommandException(ErrorCode.E1321, e.getMessage(), e);
+        }
+        return Boolean.parseBoolean(isEnabled);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java
index bcd7d24..cfb8b4e 100644
--- a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleStartXCommand.java
@@ -155,6 +155,34 @@ public class TestBundleStartXCommand extends XDataTestCase {
     }
 
     /**
+     * Test : Start bundle job when certain coord jobs are not enabled
+     *
+     * @throws Exception
+     */
+    public void testBundleStart3() throws Exception {
+        BundleJobBean job = this.addRecordToBundleJobTableDisabledCoord(Job.Status.PREP);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        BundleJobGetJPAExecutor bundleJobGetExecutor = new BundleJobGetJPAExecutor(job.getId());
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(job.getStatus(), Job.Status.PREP);
+
+        new BundleStartXCommand(job.getId()).call();
+
+        job = jpaService.execute(bundleJobGetExecutor);
+        assertEquals(job.getStatus(), Job.Status.RUNNING);
+
+        sleep(2000);
+
+        List<BundleActionBean> actions = BundleActionQueryExecutor.getInstance()
+                .getList(BundleActionQuery.GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE, job.getId());
+
+        assertEquals(1, actions.size());
+        assertEquals(job.getId(), actions.get(0).getBundleId());
+    }
+
+    /**
      * Test : Start bundle job with dryrun
      *
      * @throws Exception

http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index 4c45dca..a9aa79a 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -1030,6 +1030,95 @@ public abstract class XDataTestCase extends XHCatTestCase {
     }
 
     /**
+     * Insert bundle job for testing.
+     *
+     * @param jobStatus job status
+     * @param pending true if pending
+     * @return bundle job bean
+     * @throws Exception
+     */
+    protected BundleJobBean addRecordToBundleJobTableDisabledCoord(Job.Status jobStatus) throws Exception {
+        BundleJobBean bundle = createBundleJobCoordDisabled(jobStatus);
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundle);
+            jpaService.execute(bundleInsertjpa);
+        }
+        catch (JPAExecutorException ce) {
+            ce.printStackTrace();
+            fail("Unable to insert the test bundle job record to table");
+            throw ce;
+        }
+        return bundle;
+    }
+
+    /**
+     * Creates bundle job bean with one disabled coordinator
+     *
+     * @param jobStatus job status
+     * @return bundle job bean
+     * @throws Exception
+     */
+    protected BundleJobBean createBundleJobCoordDisabled(Job.Status jobStatus) throws Exception {
+        return createBundleJobCoordDisabled(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE),
+                jobStatus);
+    }
+
+    /**
+     * Creates bundle job bean with one disabled coordinator
+     *
+     * @param jobID
+     * @param jobStatus job status
+     * @return bundle job bean
+     * @throws Exception
+     */
+    protected BundleJobBean createBundleJobCoordDisabled(String jobID, Job.Status jobStatus) throws Exception {
+        Path coordPath1 = new Path(getFsTestCaseDir(), "coord1");
+        Path coordPath2 = new Path(getFsTestCaseDir(), "coord2");
+        writeCoordXml(coordPath1, "coord-job-bundle.xml");
+        writeCoordXml(coordPath2, "coord-job-bundle.xml");
+
+        Path bundleAppPath = new Path(getFsTestCaseDir(), "bundle");
+        String bundleAppXml = getBundleXml("bundle-submit-job.xml");
+        assertNotNull(bundleAppXml);
+        assertTrue(bundleAppXml.length() > 0);
+
+        bundleAppXml = bundleAppXml.replaceAll("#app_path1",
+                Matcher.quoteReplacement(new Path(coordPath1.toString(), "coordinator.xml").toString()));
+        bundleAppXml = bundleAppXml.replaceAll("#app_path2",
+                Matcher.quoteReplacement(new Path(coordPath2.toString(), "coordinator.xml").toString()));
+
+        writeToFile(bundleAppXml, bundleAppPath, "bundle.xml");
+
+        Configuration conf = new XConfiguration();
+        conf.set(OozieClient.BUNDLE_APP_PATH, bundleAppPath.toString());
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set("jobTracker", getJobTrackerUri());
+        conf.set("nameNode", getNameNodeUri());
+        conf.set("appName", "bundle-app-name");
+        conf.set("coordName1", "coord1");
+        conf.set("coordName2", "coord2");
+        conf.set("isEnabled", "false");
+
+        BundleJobBean bundle = new BundleJobBean();
+        bundle.setId(jobID);
+        bundle.setAppName("BUNDLE-TEST");
+        bundle.setAppPath(bundleAppPath.toString());
+        bundle.setConf(XmlUtils.prettyPrint(conf).toString());
+        bundle.setConsoleUrl("consoleUrl");
+        bundle.setCreatedTime(new Date());
+        bundle.setJobXml(bundleAppXml);
+        bundle.setLastModifiedTime(new Date());
+        bundle.setOrigJobXml(bundleAppXml);
+        bundle.resetPending();
+        bundle.setStatus(jobStatus);
+        bundle.setUser(conf.get(OozieClient.USER_NAME));
+        bundle.setGroup(conf.get(OozieClient.GROUP_NAME));
+        return bundle;
+    }
+
+    /**
      * Create bundle action bean and save to db
      *
      * @param jobId bundle job id
@@ -1409,6 +1498,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
         conf.set("appName", "bundle-app-name");
         conf.set("coordName1", "coord1");
         conf.set("coordName2", "coord2");
+        conf.set("isEnabled", "true");
 
         BundleJobBean bundle = new BundleJobBean();
         bundle.setId(jobID);
@@ -1478,6 +1568,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
         conf.set("coordName1", "coord1");
         conf.set("coordName2", "coord2");
         conf.set("coord1.starttime","2009-02-01T00:00Z");
+        conf.set("isEnabled", "true");
 
         BundleJobBean bundle = new BundleJobBean();
         bundle.setId(Services.get().get(UUIDService.class).generateId(ApplicationType.BUNDLE));

http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/core/src/test/resources/bundle-submit-job.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/bundle-submit-job.xml b/core/src/test/resources/bundle-submit-job.xml
index 6eda402..18ebda6 100644
--- a/core/src/test/resources/bundle-submit-job.xml
+++ b/core/src/test/resources/bundle-submit-job.xml
@@ -40,7 +40,7 @@
                      </property>
                  </configuration>
           </coordinator>
-          <coordinator name='${coordName2}' critical='false'>
+          <coordinator name='${coordName2}' critical='false' enabled='${isEnabled}'>
                  <app-path>#app_path2</app-path>
                  <configuration>
                      <property>

http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/docs/src/site/twiki/BundleFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/BundleFunctionalSpec.twiki b/docs/src/site/twiki/BundleFunctionalSpec.twiki
index c9252c0..c4e1e36 100644
--- a/docs/src/site/twiki/BundleFunctionalSpec.twiki
+++ b/docs/src/site/twiki/BundleFunctionalSpec.twiki
@@ -94,7 +94,9 @@ A bundle definition is defined in XML by a name, controls and one or more coordi
     * *%BLUE% controls: %ENDCOLOR%* The control specification for the bundle.
       * *%BLUE% kick-off-time: %ENDCOLOR%* It defines when the bundle job should start and submit the coordinator applications. This field is optional and the default is *NOW* that means the job should start right-a-way.
    * *%BLUE% coordinator: %ENDCOLOR%* Coordinator application specification. There should be at least one coordinator application in any bundle.
-      * *%BLUE% name: %ENDCOLOR%* Name of the coordinator application. It can be used for referring this application through bundle to control such as kill, suspend, rerun.
+      * *%BLUE% name: %ENDCOLOR%* Name of the coordinator application. It can be used for referring this application through
+      bundle to control such as kill, suspend, rerun. Enabled can be used to enable or disable a coordinator. It is optional.
+      The default value for enabled is true.
       * *%BLUE% app-path: %ENDCOLOR%* Path of the coordinator application definition in hdfs. This is a mandatory element.
       * *%BLUE% configuration: %ENDCOLOR%* A hadoop like configuration to parameterize corresponding coordinator application. This is optional.
     * *%BLUE% Parameterization: %ENDCOLOR%*  Configuration properties that are a valid Java identifier, [A-Za-z_][0-9A-Za-z_]*, are available as =${NAME}= variables within the bundle application definition. Configuration properties that are not a valid Java identifier, for example =job.tracker=, are available via the =${bundle:conf(String name)}= function. Valid Java identifier properties are available via this function as well.
@@ -107,7 +109,7 @@ A bundle definition is defined in XML by a name, controls and one or more coordi
   <controls>
        <kick-off-time>[DATETIME]</kick-off-time>
   </controls>
-   <coordinator name=[NAME] >
+   <coordinator name=[NAME] enabled=[TRUE | FALSE] >
        <app-path>[COORD-APPLICATION-PATH]</app-path>
           <configuration>
             <property>
@@ -386,6 +388,7 @@ Refer to the [[DG_CoordinatorRerun][Rerunning Coordinator Actions]] for details
         </xs:sequence>
         <xs:attribute name="name" type="bundle:IDENTIFIER" use="required"/>
         <xs:attribute name="critical" type="xs:string" use="optional"/>
+        <xs:attribute name="enabled" type="xs:string" use="optional"/>
     </xs:complexType>
     <xs:complexType name="CONFIGURATION">
         <xs:sequence>

http://git-wip-us.apache.org/repos/asf/oozie/blob/5abd3e6a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1525ae4..5c2ee5b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2444 Need conditional logic in bundles (satishsaley via puru)
 OOZIE-2394 Oozie can execute command without holding lock (puru)
 OOZIE-1922 MemoryLocksService fails if lock is acquired multiple times in same thread and released (puru)
 OOZIE-2432 TestPurgeXCommand fails (fdenes via rkanter)