You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2013/03/15 08:02:48 UTC

svn commit: r1456786 - in /oozie/branches/branch-4.0: ./ core/src/main/java/org/apache/oozie/command/coord/ core/src/test/java/org/apache/oozie/command/coord/

Author: virag
Date: Fri Mar 15 07:02:48 2013
New Revision: 1456786

URL: http://svn.apache.org/r1456786
Log:
OOZIE-1267 Dryrun option for push missing deps (virag)

Added:
    oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
Modified:
    oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
    oozie/branches/branch-4.0/release-log.txt

Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1456786&r1=1456785&r2=1456786&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java Fri Mar 15 07:02:48 2013
@@ -251,7 +251,7 @@ public class CoordActionInputCheckXComma
         return checkResolvedUris(eAction, existList, nonExistList, conf);
     }
 
-    private boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
+    protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml.toString());
         LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
         boolean allExist = checkUnresolvedInstances(eAction, conf);

Modified: oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1456786&r1=1456785&r2=1456786&view=diff
==============================================================================
--- oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java (original)
+++ oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java Fri Mar 15 07:02:48 2013
@@ -36,6 +36,7 @@ import org.apache.oozie.coord.CoordUtils
 import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.dependency.ActionDependency;
 import org.apache.oozie.dependency.DependencyChecker;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.dependency.URIHandler.DependencyType;
@@ -482,23 +483,52 @@ public class CoordCommandUtils {
             return XmlUtils.prettyPrint(eAction).toString();
         }
         else {
-            String action = XmlUtils.prettyPrint(eAction).toString();
-            StringBuilder actionXml = new StringBuilder(action);
-            Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
-            if (actionBean.getPushMissingDependencies() != null) {
-                DependencyChecker.checkForAvailability(actionBean.getPushMissingDependencies(), actionConf, true);
-            }
-            if (actionBean.getMissingDependencies() != null) {
-                CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(),
-                        actionBean.getJobId());
-                StringBuilder existList = new StringBuilder();
-                StringBuilder nonExistList = new StringBuilder();
-                StringBuilder nonResolvedList = new StringBuilder();
-                getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
-                coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+            return dryRunCoord(eAction, actionBean);
+        }
+    }
+
+    /**
+     * @param eAction the actionXml related element
+     * @param actionBean the coordinator action bean
+     * @return
+     * @throws Exception
+     */
+    static String dryRunCoord(Element eAction, CoordinatorActionBean actionBean) throws Exception {
+        String action = XmlUtils.prettyPrint(eAction).toString();
+        StringBuilder actionXml = new StringBuilder(action);
+        Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+
+        boolean isPushDepAvailable = true;
+        if (actionBean.getPushMissingDependencies() != null) {
+            ActionDependency actionDep = DependencyChecker.checkForAvailability(
+                    actionBean.getPushMissingDependencies(), actionConf, true);
+            if (actionDep.getMissingDependencies().size() != 0) {
+                isPushDepAvailable = false;
             }
-            return actionXml.toString();
+
         }
+        boolean isPullDepAvailable = true;
+        CoordActionInputCheckXCommand coordActionInput = new CoordActionInputCheckXCommand(actionBean.getId(),
+                actionBean.getJobId());
+        if (actionBean.getMissingDependencies() != null) {
+            StringBuilder existList = new StringBuilder();
+            StringBuilder nonExistList = new StringBuilder();
+            StringBuilder nonResolvedList = new StringBuilder();
+            getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
+            isPullDepAvailable = coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+        }
+
+        if (isPullDepAvailable && isPushDepAvailable) {
+            // Check for latest/future
+            boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionXml, actionConf);
+            if (isLatestFutureDepAvailable) {
+                String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
+                        actionBean.getId());
+                actionXml.replace(0, actionXml.length(), newActionXml);
+            }
+        }
+
+        return actionXml.toString();
     }
 
     /**

Added: oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java?rev=1456786&view=auto
==============================================================================
--- oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java (added)
+++ oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java Fri Mar 15 07:02:48 2013
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.text.ParseException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+public class TestCoordCommandUtils extends XDataTestCase {
+    protected Services services;
+
+    protected String getProcessingTZ() {
+        return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
+    }
+
+    private String hcatServer;
+
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        setSystemProperty(DateUtils.OOZIE_PROCESSING_TIMEZONE_KEY, getProcessingTZ());
+        services = super.setupServicesForHCatalog();
+        services.init();
+        cleanUpDBTables();
+        hcatServer = getMetastoreAuthority();
+    }
+
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    @Test
+    public void testDryRunPushDependencies() {
+        try {
+            CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                    CoordinatorJob.Status.RUNNING, false, true);
+            Path appPath = new Path(getFsTestCaseDir(), "coord");
+            // actionXml only to check whether coord conf got resolved or not
+            String actionXml = getCoordActionXml(appPath, "coord-action-for-action-input-check.xml");
+            CoordinatorActionBean actionBean = createCoordinatorActionBean(job);
+
+            String db = "default";
+            String table = "tablename";
+            String hcatDependency = getPushMissingDependencies(db, table);
+            actionBean.setPushMissingDependencies(hcatDependency);
+
+            Element eAction = createActionElement(actionXml);
+            String newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+            eAction = XmlUtils.parseXml(newactionXml);
+
+            Element configElem = eAction.getChild("action", eAction.getNamespace())
+                    .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+            List<?> elementList = configElem.getChildren("property", configElem.getNamespace());
+            Element e1 = (Element) elementList.get(0);
+            Element e2 = (Element) elementList.get(1);
+            // Make sure conf is not resolved as dependencies are not met
+            assertEquals("${coord:dataIn('A')}", e1.getChild("value", e1.getNamespace()).getValue());
+            assertEquals("${coord:dataOut('LOCAL_A')}", e2.getChild("value", e2.getNamespace()).getValue());
+
+            // Make the dependencies available
+            populateTable(db, table);
+            newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+            eAction = XmlUtils.parseXml(newactionXml);
+            configElem = eAction.getChild("action", eAction.getNamespace())
+                    .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+            elementList = configElem.getChildren("property", configElem.getNamespace());
+            e1 = (Element) elementList.get(0);
+            e2 = (Element) elementList.get(1);
+            // Check for resolved conf
+            assertEquals(
+                    "file://,testDir/2009/29,file://,testDir/2009/22,file://,testDir/2009/15,file://,testDir/2009/08",
+                    e1.getChild("value", e1.getNamespace()).getValue());
+            assertEquals("file://,testDir/2009/29", e2.getChild("value", e1.getNamespace()).getValue());
+
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testDryRunPullDeps() {
+
+        try {
+            CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
+                    CoordinatorJob.Status.RUNNING, false, true);
+
+            Path appPath = new Path(getFsTestCaseDir(), "coord");
+            // actionXml only to check whether coord conf got resolved or not
+            String actionXml = getCoordActionXml(appPath, "coord-action-for-action-input-check.xml");
+
+            CoordinatorActionBean actionBean = createCoordinatorActionBean(job);
+            String testDir = getTestCaseDir();
+            String missDeps = getPullMissingDependencies(testDir);
+            actionBean.setMissingDependencies(missDeps);
+
+            Element eAction = createActionElement(actionXml);
+
+            String newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+            eAction = XmlUtils.parseXml(newactionXml);
+            Element configElem = eAction.getChild("action", eAction.getNamespace())
+                    .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+            List<?> elementList = configElem.getChildren("property", configElem.getNamespace());
+            Element e1 = (Element) elementList.get(0);
+            Element e2 = (Element) elementList.get(1);
+            // Make sure conf is not resolved as dependencies are not met
+            assertEquals("${coord:dataIn('A')}", e1.getChild("value", e1.getNamespace()).getValue());
+            assertEquals("${coord:dataOut('LOCAL_A')}", e2.getChild("value", e2.getNamespace()).getValue());
+
+            // Make the dependencies available
+            createDir(testDir + "/2009/29/");
+            createDir(testDir + "/2009/22/");
+            createDir(testDir + "/2009/15/");
+            createDir(testDir + "/2009/08/");
+            sleep(1000);
+
+            newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+            eAction = XmlUtils.parseXml(newactionXml);
+            configElem = eAction.getChild("action", eAction.getNamespace())
+                    .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+            elementList = configElem.getChildren("property", configElem.getNamespace());
+            e1 = (Element) elementList.get(0);
+            e2 = (Element) elementList.get(1);
+            // Check for resolved conf
+            assertEquals(
+                    "file://,testDir/2009/29,file://,testDir/2009/22,file://,testDir/2009/15,file://,testDir/2009/08",
+                    e1.getChild("value", e1.getNamespace()).getValue());
+            assertEquals("file://,testDir/2009/29", e2.getChild("value", e1.getNamespace()).getValue());
+
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+
+    }
+
+    @Test
+    public void testDryRunPullAndPushDeps() {
+
+        try {
+            CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
+                    CoordinatorJob.Status.RUNNING, false, true);
+
+            Path appPath = new Path(getFsTestCaseDir(), "coord");
+            // actionXml only to check whether coord conf got resolved or not
+            String actionXml = getCoordActionXml(appPath, "coord-action-for-action-input-check.xml");
+
+            CoordinatorActionBean actionBean = createCoordinatorActionBean(job);
+            String testDir = getTestCaseDir();
+            String missDeps = getPullMissingDependencies(testDir);
+            actionBean.setMissingDependencies(missDeps);
+
+            String db = "default";
+            String table = "tablename";
+            String hcatDependency = getPushMissingDependencies(db, table);
+
+            actionBean.setPushMissingDependencies(hcatDependency);
+
+            // Make only pull dependencies available
+            createDir(getTestCaseDir() + "/2009/29/");
+            createDir(getTestCaseDir() + "/2009/22/");
+            createDir(getTestCaseDir() + "/2009/15/");
+            createDir(getTestCaseDir() + "/2009/08/");
+            sleep(1000);
+
+            Element eAction = createActionElement(actionXml);
+
+            String newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+            eAction = XmlUtils.parseXml(newactionXml);
+
+            Element configElem = eAction.getChild("action", eAction.getNamespace())
+                    .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+            List<?> elementList = configElem.getChildren("property", configElem.getNamespace());
+            Element e1 = (Element) elementList.get(0);
+            Element e2 = (Element) elementList.get(1);
+            // Make sure conf is not resolved as pull dependencies are met but
+            // push deps are not met
+            assertEquals("${coord:dataIn('A')}", e1.getChild("value", e1.getNamespace()).getValue());
+            assertEquals("${coord:dataOut('LOCAL_A')}", e2.getChild("value", e2.getNamespace()).getValue());
+
+            populateTable(db, table);
+            newactionXml = CoordCommandUtils.dryRunCoord(eAction, actionBean);
+
+            eAction = XmlUtils.parseXml(newactionXml);
+            configElem = eAction.getChild("action", eAction.getNamespace())
+                    .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+            elementList = configElem.getChildren("property", configElem.getNamespace());
+            e1 = (Element) elementList.get(0);
+            e2 = (Element) elementList.get(1);
+            // Check for resolved conf
+            assertEquals(
+                    "file://,testDir/2009/29,file://,testDir/2009/22,file://,testDir/2009/15,file://,testDir/2009/08",
+                    e1.getChild("value", e1.getNamespace()).getValue());
+            assertEquals("file://,testDir/2009/29", e2.getChild("value", e1.getNamespace()).getValue());
+
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail(e.getMessage());
+        }
+    }
+
+    private String getPullMissingDependencies(String testDir) {
+        String missDeps = "file://#testDir/2009/29/_SUCCESS#file://#testDir/2009/22/_SUCCESS#file"
+                + "://#testDir/2009/15/_SUCCESS#file://#testDir/2009/08/_SUCCESS";
+        missDeps = missDeps.replaceAll("#testDir", testDir);
+        return missDeps;
+    }
+
+    private String getPushMissingDependencies(String db, String table) throws Exception {
+        String newHCatDependency1 = "hcat://" + hcatServer + "/" + db + "/" + table + "/dt=20120412;country=brazil";
+        String newHCatDependency2 = "hcat://" + hcatServer + "/" + db + "/" + table + "/dt=20120430;country=usa";
+        String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+        dropTable(db, table, true);
+        dropDatabase(db, true);
+        createDatabase(db);
+        createTable(db, table, "dt,country");
+        return newHCatDependency;
+    }
+
+    private Element createActionElement(String actionXml) throws JDOMException, ParseException {
+        Element eAction = XmlUtils.parseXml(actionXml);
+        eAction.removeAttribute("start");
+        eAction.removeAttribute("end");
+        eAction.setAttribute("instance-number", Integer.toString(1));
+        eAction.setAttribute("action-nominal-time",
+                DateUtils.formatDateOozieTZ(DateUtils.parseDateOozieTZ("2009-09-08T01:00Z")));
+        eAction.setAttribute("action-actual-time", DateUtils.formatDateOozieTZ(new Date()));
+        return eAction;
+    }
+
+    private CoordinatorActionBean createCoordinatorActionBean(CoordinatorJob job) throws IOException {
+        CoordinatorActionBean actionBean = new CoordinatorActionBean();
+        String actionId = Services.get().get(UUIDService.class).generateChildId(job.getId(), "1");
+        actionBean.setJobId(job.getId());
+        actionBean.setId(actionId);
+        Configuration jobConf = new XConfiguration(new StringReader(job.getConf()));
+        actionBean.setRunConf(XmlUtils.prettyPrint(jobConf).toString());
+        return actionBean;
+    }
+
+    private void createDir(String dir) {
+        Process pr;
+        try {
+            pr = Runtime.getRuntime().exec("mkdir -p " + dir + "/_SUCCESS");
+            pr.waitFor();
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+        catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void populateTable(String db, String table) throws Exception {
+        addPartition(db, table, "dt=20120430;country=usa");
+        addPartition(db, table, "dt=20120412;country=brazil");
+        addPartition(db, table, "dt=20120413;country=brazil");
+    }
+
+}

Modified: oozie/branches/branch-4.0/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1456786&r1=1456785&r2=1456786&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Fri Mar 15 07:02:48 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.0.0 (unreleased)
 
+OOZIE-1267 Dryrun option for push missing deps (virag)
 OOZIE-1263 Fix few HCat dependency check issues (rohini via virag)
 OOZIE-1261 Registered push dependencies are not removed on Coord Kill command (virag)
 OOZIE-1191 add examples of coordinator with SLA tag inserted (ryota via mona)