You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/06/20 10:08:41 UTC

oozie git commit: OOZIE-2796 oozie.action.keep.action.dir not getting notice (zgengxb2005 via gezapeti)

Repository: oozie
Updated Branches:
  refs/heads/master 72bce837d -> 8bb40f3fa


OOZIE-2796 oozie.action.keep.action.dir not getting notice (zgengxb2005 via gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8bb40f3f
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8bb40f3f
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8bb40f3f

Branch: refs/heads/master
Commit: 8bb40f3fa88c1d17984108a8ec0ff56fd24800f6
Parents: 72bce83
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Tue Jun 20 12:08:32 2017 +0200
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Tue Jun 20 12:08:32 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/oozie/WorkflowJobBean.java  |  2 +-
 .../oozie/action/hadoop/FsActionExecutor.java   |  3 +-
 .../oozie/action/hadoop/JavaActionExecutor.java |  3 +-
 .../oozie/command/wf/ActionKillXCommand.java    |  2 +-
 .../apache/oozie/command/wf/WfEndXCommand.java  | 45 +++++++++++++++-----
 .../oozie/command/wf/WorkflowXCommand.java      |  4 ++
 .../executor/jpa/WorkflowJobQueryExecutor.java  |  1 +
 .../jpa/TestWorkflowJobQueryExecutor.java       |  3 +-
 release-log.txt                                 |  1 +
 9 files changed, 48 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
index 2042063..028164d 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
@@ -101,7 +101,7 @@ import java.util.List;
 
     @NamedQuery(name = "GET_WORKFLOW_ACTION_OP", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.run, w.parentId, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),
 
-    @NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml from WorkflowJobBean w where w.id = :id"),
+    @NamedQuery(name = "GET_WORKFLOW_KILL", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.slaXml, w.protoActionConf from WorkflowJobBean w where w.id = :id"),
 
     @NamedQuery(name = "GET_WORKFLOW_RESUME", query = "select w.id, w.user, w.group, w.appName, w.appPath, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance, w.protoActionConf from WorkflowJobBean w where w.id = :id"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
index 85d3efa..0515db6 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/FsActionExecutor.java
@@ -41,6 +41,7 @@ import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.command.wf.WorkflowXCommand;
 import org.apache.oozie.dependency.FSURIHandler;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.service.ConfigurationService;
@@ -630,7 +631,7 @@ public class FsActionExecutor extends ActionExecutor {
         WorkflowAction.Status status = externalStatus.equals("OK") ? WorkflowAction.Status.OK :
                                        WorkflowAction.Status.ERROR;
         context.setEndData(status, getActionSignal(status));
-        if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)) {
+        if (!context.getProtoActionConf().getBoolean(WorkflowXCommand.KEEP_WF_ACTION_DIR, false)) {
             try {
                 FileSystem fs = context.getAppFileSystem();
                 fs.delete(context.getActionDir(), true);

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index bf02397..465cd9e 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -84,6 +84,7 @@ import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
+import org.apache.oozie.command.wf.WorkflowXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -511,7 +512,7 @@ public class JavaActionExecutor extends ActionExecutor {
     void cleanUpActionDir(FileSystem actionFs, Context context) throws ActionExecutorException {
         try {
             Path actionDir = context.getActionDir();
-            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
+            if (!context.getProtoActionConf().getBoolean(WorkflowXCommand.KEEP_WF_ACTION_DIR, false)
                     && actionFs.exists(actionDir)) {
                 actionFs.delete(actionDir, true);
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
index ac096cc..61891b8 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
@@ -208,7 +208,7 @@ public class ActionKillXCommand extends ActionXCommand<Void> {
             FileSystem actionFs = context.getAppFileSystem();
             Path actionDir = context.getActionDir();
             Path jobDir = actionDir.getParent();
-            if (!context.getProtoActionConf().getBoolean("oozie.action.keep.action.dir", false)
+            if (!context.getProtoActionConf().getBoolean(KEEP_WF_ACTION_DIR, false)
                     && actionFs.exists(actionDir)) {
                 actionFs.delete(actionDir, true);
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
index e282d94..0531bb3 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/WfEndXCommand.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.command.wf;
 
 import java.io.IOException;
+import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 
@@ -26,12 +27,16 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.util.XConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * This Command is expected to be called when a Workflow moves to any terminal
@@ -40,40 +45,58 @@ import org.apache.oozie.service.Services;
  */
 public class WfEndXCommand extends WorkflowXCommand<Void> {
 
-    private WorkflowJob job = null;
+    private WorkflowJobBean jobBean = null;
 
-    public WfEndXCommand(WorkflowJob job) {
+    public WfEndXCommand(WorkflowJobBean jobBean) {
         super("wf_end", "wf_end", 1);
-        this.job = job;
+        this.jobBean = jobBean;
     }
 
     @Override
     protected Void execute() throws CommandException {
-        LOG.debug("STARTED WFEndXCommand " + job.getId());
+        LOG.debug("STARTED WFEndXCommand " + jobBean.getId());
         deleteWFDir();
-        LOG.debug("ENDED WFEndXCommand " + job.getId());
+        LOG.debug("ENDED WFEndXCommand " + jobBean.getId());
         return null;
     }
 
     private void deleteWFDir() throws CommandException {
         FileSystem fs;
         try {
-            fs = getAppFileSystem(job);
-            String wfDir = Services.get().getSystemId() + "/" + job.getId();
+            fs = getAppFileSystem(jobBean);
+            String wfDir = Services.get().getSystemId() + "/" + jobBean.getId();
             Path wfDirPath = new Path(fs.getHomeDirectory(), wfDir);
+
             LOG.debug("WF tmp dir :" + wfDirPath);
-            if (fs.exists(wfDirPath)) {
+            boolean keepActionDir = keepWfActionDir();
+            if (!keepActionDir && fs.exists(wfDirPath)) {
                 fs.delete(wfDirPath, true);
             }
+            else if (keepActionDir) {
+                LOG.debug(KEEP_WF_ACTION_DIR + " is set to true");
+            }
             else {
                 LOG.debug("Tmp dir doesn't exist :" + wfDirPath);
             }
         }
         catch (Exception e) {
-            LOG.error("Unable to delete WF temp dir of wf id :" + job.getId(), e);
-            throw new CommandException(ErrorCode.E0819, job.getId(), e);
+            LOG.error("Unable to delete WF temp dir of wf id :" + jobBean.getId(), e);
+            throw new CommandException(ErrorCode.E0819, jobBean.getId(), e);
         }
+    }
+
+    @VisibleForTesting
+    protected boolean keepWfActionDir() throws IOException {
+        if (jobBean.getProtoActionConf() == null) {
+            return false;
+        }
+        Configuration wfConf = getWfConfiguration();
+        return wfConf.getBoolean(KEEP_WF_ACTION_DIR, false);
+    }
 
+    @VisibleForTesting
+    protected Configuration getWfConfiguration() throws IOException {
+        return new XConfiguration(new StringReader(jobBean.getProtoActionConf()));
     }
 
     protected FileSystem getAppFileSystem(WorkflowJob workflow) throws HadoopAccessorException, IOException,
@@ -86,7 +109,7 @@ public class WfEndXCommand extends WorkflowXCommand<Void> {
 
     @Override
     public String getEntityKey() {
-        return job.getId();
+        return jobBean.getId();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
index bc80dfe..87d7e77 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
@@ -34,6 +34,10 @@ import org.apache.oozie.event.WorkflowJobEvent;
  */
 public abstract class WorkflowXCommand<T> extends XCommand<T> {
 
+    // Configuration on whether or not workflow and action directory will be deleted
+    // after workflow is done.
+    public static final String KEEP_WF_ACTION_DIR = "oozie.action.keep.action.dir";
+
     protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
     protected static final String INSTR_KILLED_JOBS_COUNTER_NAME = "killed";
     protected static final String INSTR_FAILED_JOBS_COUNTER_NAME = "failed";

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index 13fa54d..aa622d0 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -295,6 +295,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor
                 bean.setLogToken((String) arr[9]);
                 bean.setWfInstanceBlob((BinaryBlob) (arr[10]));
                 bean.setSlaXmlBlob((StringBlob) arr[11]);
+                bean.setProtoActionConfBlob((StringBlob) arr[12]);
                 break;
             case GET_WORKFLOW_RESUME:
                 bean = new WorkflowJobBean();

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
index c01d3d5..fadd8b9 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobQueryExecutor.java
@@ -194,6 +194,7 @@ public class TestWorkflowJobQueryExecutor extends XDataTestCase {
 
     public void testGet() throws Exception {
         WorkflowJobBean bean = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
+        assertNotNull(bean.getProtoActionConf());
         bean.setStartTime(new Date(System.currentTimeMillis() - 10));
         bean.setEndTime(new Date());
         WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW, bean);
@@ -301,7 +302,7 @@ public class TestWorkflowJobQueryExecutor extends XDataTestCase {
         assertEquals(ByteBuffer.wrap(bean.getWfInstanceBlob().getBytes()).getInt(),
                 ByteBuffer.wrap(retBean.getWfInstanceBlob().getBytes()).getInt());
         assertEquals(bean.getSlaXml(), retBean.getSlaXml());
-        assertNull(retBean.getProtoActionConf());
+        assertEquals(bean.getProtoActionConf(), retBean.getProtoActionConf());
         assertNull(retBean.getConf());
 
         // GET_WORKFLOW_RESUME

http://git-wip-us.apache.org/repos/asf/oozie/blob/8bb40f3f/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 05ed49c..cfc94e9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-2796 oozie.action.keep.action.dir not getting notice (zgengxb2005 via gezapeti)
 OOZIE-2769 Extend FS action to allow setrep on a file (Artem Ervits via gezapeti)
 OOZIE-2815 amend - Oozie not always display job log (andras.piros via gezapeti)
 OOZIE-2946 Include find-sec-bugs plugin (Jan Hentschel via rkanter)