You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/05/08 10:40:14 UTC

[08/37] oozie git commit: OOZIE-2041 Add an admin command to run the PurgeXCommand (abhishekbafna)

OOZIE-2041 Add an admin command to run the PurgeXCommand (abhishekbafna)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9b07ee82
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9b07ee82
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9b07ee82

Branch: refs/heads/oya
Commit: 9b07ee82dddaab0ff5562a84b7f5230d11315e9c
Parents: 5ed5967
Author: abhisek bafna <ab...@hortonworks.com>
Authored: Wed Mar 22 20:17:40 2017 +0530
Committer: abhisek bafna <ab...@hortonworks.com>
Committed: Wed Mar 22 20:17:40 2017 +0530

----------------------------------------------------------------------
 .../java/org/apache/oozie/cli/OozieCLI.java     |  7 ++
 .../org/apache/oozie/cli/ValidationUtil.java    | 30 ++++++++
 .../org/apache/oozie/client/OozieClient.java    | 69 ++++++++++++++++++
 .../org/apache/oozie/client/rest/JsonTags.java  |  1 +
 .../apache/oozie/client/rest/RestConstants.java |  6 ++
 .../org/apache/oozie/command/PurgeXCommand.java |  6 +-
 .../org/apache/oozie/service/PurgeService.java  |  1 +
 .../apache/oozie/servlet/BaseAdminServlet.java  | 73 +++++++++++++++++---
 .../apache/oozie/servlet/V0AdminServlet.java    | 16 ++---
 .../apache/oozie/servlet/V1AdminServlet.java    | 41 ++++-------
 core/src/main/resources/oozie-default.xml       |  8 +++
 .../org/apache/oozie/client/TestOozieCLI.java   | 55 +++++++++++++++
 .../apache/oozie/servlet/TestAdminServlet.java  | 32 ++++++++-
 .../oozie/servlet/TestV1AdminServlet.java       | 13 ++++
 docs/src/site/twiki/DG_CommandLineTool.twiki    | 21 ++++++
 docs/src/site/twiki/WebServicesAPI.twiki        | 28 ++++++++
 release-log.txt                                 |  1 +
 17 files changed, 354 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
index b0b87ad..38fb84e 100644
--- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
+++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
@@ -151,6 +151,7 @@ public class OozieCLI {
     public static final String UPDATE_SHARELIB_OPTION = "sharelibupdate";
 
     public static final String LIST_SHARELIB_LIB_OPTION = "shareliblist";
+    public static final String PURGE_OPTION = "purge";
 
     public static final String SLA_DISABLE_ALERT = "sladisable";
     public static final String SLA_ENABLE_ALERT = "slaenable";
@@ -279,6 +280,8 @@ public class OozieCLI {
         Option sharelib = new Option(LIST_SHARELIB_LIB_OPTION, false,
                 "List available sharelib that can be specified in a workflow action");
         sharelib.setOptionalArg(true);
+        Option purge = new Option(PURGE_OPTION, true, "purge old oozie workflow, coordinator and bundle records from DB " +
+                "(parameter unit: day)");
 
         Options adminOptions = new Options();
         adminOptions.addOption(oozie);
@@ -296,6 +299,7 @@ public class OozieCLI {
         group.addOption(javaSysProps);
         group.addOption(metrics);
         group.addOption(instrumentation);
+        group.addOption(purge);
         adminOptions.addOptionGroup(group);
         addAuthOptions(adminOptions);
         return adminOptions;
@@ -1949,6 +1953,9 @@ public class OozieCLI {
                 } else {
                     printInstrumentation(instrumentation);
                 }
+            } else if (options.contains(PURGE_OPTION)) {
+                String purgeOptions = commandLine.getOptionValue(PURGE_OPTION);
+                System.out.println(wc.purgeCommand(purgeOptions));
             }
         }
         catch (OozieClientException ex) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/client/src/main/java/org/apache/oozie/cli/ValidationUtil.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/cli/ValidationUtil.java b/client/src/main/java/org/apache/oozie/cli/ValidationUtil.java
new file mode 100644
index 0000000..b1da2af
--- /dev/null
+++ b/client/src/main/java/org/apache/oozie/cli/ValidationUtil.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.cli;
+
+public class ValidationUtil {
+
+    public static int parsePositiveInteger(String iStr) {
+        int value = Integer.parseInt(iStr);
+        if (value < 0) {
+            throw new IllegalArgumentException("Input value should be a positive integer. Value: " + iStr);
+        }
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/client/src/main/java/org/apache/oozie/client/OozieClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java
index 84f41c9..7370808 100644
--- a/client/src/main/java/org/apache/oozie/client/OozieClient.java
+++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java
@@ -20,6 +20,7 @@ package org.apache.oozie.client;
 
 import com.google.common.collect.Lists;
 import org.apache.oozie.BuildInfo;
+import org.apache.oozie.cli.ValidationUtil;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.JsonToBean;
 import org.apache.oozie.client.rest.RestConstants;
@@ -2289,6 +2290,74 @@ public class OozieClient {
 
     }
 
+    public String purgeCommand(String purgeOptions) throws OozieClientException {
+        String workflowAge = "";
+        String coordAge = "";
+        String bundleAge = "";
+        String purgeLimit = "";
+        String oldCoordAction = "";
+        if (purgeOptions != null) {
+            String options[] = purgeOptions.split(";");
+            for (String option : options) {
+                String pair[] = option.split("=");
+                if (pair.length < 2) {
+                    throw new OozieClientException(OozieClientException.INVALID_INPUT,
+                            "Invalid purge option pair [" + option + "] specified.");
+                }
+                String key = pair[0].toLowerCase();
+                String value = pair[1];
+                switch (key) {
+                    case "wf":
+                        workflowAge = String.valueOf(ValidationUtil.parsePositiveInteger(value));
+                        break;
+                    case "coord":
+                        coordAge = String.valueOf(ValidationUtil.parsePositiveInteger(value));
+                        break;
+                    case "bundle":
+                        bundleAge = String.valueOf(ValidationUtil.parsePositiveInteger(value));
+                        break;
+                    case "limit":
+                        purgeLimit = String.valueOf(ValidationUtil.parsePositiveInteger(value));
+                        break;
+                    case "oldcoordaction":
+                        oldCoordAction = value;
+                        break;
+                    default:
+                        throw new OozieClientException(OozieClientException.INVALID_INPUT,
+                                "Invalid purge option [" + key + "] specified.");
+                }
+            }
+        }
+        PurgeCommand purgeCommand = new PurgeCommand(workflowAge, coordAge, bundleAge, purgeLimit, oldCoordAction);
+        return purgeCommand.call();
+    }
+
+    private class PurgeCommand extends ClientCallable<String> {
+
+        PurgeCommand(String workflowAge, String coordAge, String bundleAge, String purgeLimit, String oldCoordAction) {
+            super("PUT", RestConstants.ADMIN, RestConstants.ADMIN_PURGE, prepareParams(
+                    RestConstants.PURGE_WF_AGE, workflowAge,
+                    RestConstants.PURGE_COORD_AGE, coordAge,
+                    RestConstants.PURGE_BUNDLE_AGE, bundleAge,
+                    RestConstants.PURGE_LIMIT, purgeLimit,
+                    RestConstants.PURGE_OLD_COORD_ACTION, oldCoordAction
+            ));
+        }
+
+        @Override
+        protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
+            if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
+                Reader reader = new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8);
+                JSONObject jsonObject = (JSONObject) JSONValue.parse(reader);
+                Object msg = jsonObject.get(JsonTags.PURGE);
+                return msg.toString();
+            } else {
+                handleError(conn);
+            }
+            return null;
+        }
+    }
+
     /**
      * Return the Oozie server build version.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
index 8f220b8..ca168e0 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
@@ -252,4 +252,5 @@ public interface JsonTags {
     String COORD_ACTION_FIRST_MISSING_DEPENDENCIES = "blockedOn";
 
 
+    String PURGE = "purge";
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
index 4e38b4a..8ddb1f8 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
@@ -219,4 +219,10 @@ public interface RestConstants {
 
     public static final String COORD_ACTION_MISSING_DEPENDENCIES = "missing-dependencies";
 
+    String ADMIN_PURGE = "purge";
+    String PURGE_WF_AGE = "wf";
+    String PURGE_COORD_AGE = "coord";
+    String PURGE_BUNDLE_AGE = "bundle";
+    String PURGE_LIMIT = "limit";
+    String PURGE_OLD_COORD_ACTION = "oldcoordaction";
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
index ab06fdf..2ba1999 100644
--- a/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/PurgeXCommand.java
@@ -142,7 +142,7 @@ public class PurgeXCommand extends XCommand<Void> {
     @Override
     protected Void execute() throws CommandException {
         LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
-                + "jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
+                + " jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
 
         // Process parentless workflows to purge them and their children
         if (!wfList.isEmpty()) {
@@ -407,7 +407,7 @@ public class PurgeXCommand extends XCommand<Void> {
      */
     @Override
     public String getEntityKey() {
-        return null;
+        return "purge_command";
     }
 
     /* (non-Javadoc)
@@ -415,7 +415,7 @@ public class PurgeXCommand extends XCommand<Void> {
      */
     @Override
     protected boolean isLockRequired() {
-        return false;
+        return true;
     }
 
     /* (non-Javadoc)

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/main/java/org/apache/oozie/service/PurgeService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/PurgeService.java b/core/src/main/java/org/apache/oozie/service/PurgeService.java
index 6e4a8e8..c2c966b 100644
--- a/core/src/main/java/org/apache/oozie/service/PurgeService.java
+++ b/core/src/main/java/org/apache/oozie/service/PurgeService.java
@@ -39,6 +39,7 @@ public class PurgeService implements Service {
      */
     public static final String CONF_PURGE_INTERVAL = CONF_PREFIX + "purge.interval";
     public static final String PURGE_LIMIT = CONF_PREFIX + "purge.limit";
+    public static final String PURGE_COMMAND_ENABLED = CONF_PREFIX + "enable.command.line";
 
     /**
      * PurgeRunnable is the runnable which is scheduled to run at the configured interval. PurgeCommand is queued to

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
index 64d3f1f..d263567 100644
--- a/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/BaseAdminServlet.java
@@ -29,20 +29,28 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.BuildInfo;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.cli.ValidationUtil;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PurgeXCommand;
 import org.apache.oozie.service.AuthorizationException;
 import org.apache.oozie.service.AuthorizationService;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.JobsConcurrencyService;
+import org.apache.oozie.service.PurgeService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.util.AuthUrlClient;
 import org.apache.oozie.util.ConfigUtils;
 import org.apache.oozie.util.Instrumentation;
+import org.apache.oozie.util.XLog;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
@@ -50,6 +58,7 @@ import org.json.simple.JSONValue;
 public abstract class BaseAdminServlet extends JsonRestServlet {
 
     private static final long serialVersionUID = 1L;
+    private static final XLog LOG = XLog.getLog(BaseAdminServlet.class);
     protected String modeTag;
 
 
@@ -59,7 +68,13 @@ public abstract class BaseAdminServlet extends JsonRestServlet {
     }
 
     /**
-     * Change safemode state.
+     * Oozie admin PUT request for
+     *      change oozie system mode
+     *      admin purge command
+     * @param request http request
+     * @param response http response
+     * @throws ServletException thrown if any servlet error
+     * @throws IOException  thrown if any I/O error
      */
     @Override
     protected void doPut(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
@@ -68,16 +83,19 @@ public abstract class BaseAdminServlet extends JsonRestServlet {
         request.setAttribute(AUDIT_PARAM, request.getParameter(modeTag));
 
         authorizeRequest(request);
-
-        setOozieMode(request, response, resourceName);
-        /*if (resourceName.equals(RestConstants.ADMIN_STATUS_RESOURCE)) {
-            boolean safeMode = Boolean.parseBoolean(request.getParameter(RestConstants.ADMIN_SAFE_MODE_PARAM));
-            Services.get().setSafeMode(safeMode);
-            response.setStatus(HttpServletResponse.SC_OK);
+        switch (resourceName) {
+            case RestConstants.ADMIN_STATUS_RESOURCE:
+                setOozieMode(request, response, resourceName);
+                break;
+            case RestConstants.ADMIN_PURGE:
+                String msg = schedulePurgeCommand(request);
+                JSONObject jsonObject = new JSONObject();
+                jsonObject.put(JsonTags.PURGE, msg);
+                sendJsonResponse(response, HttpServletResponse.SC_OK, jsonObject);
+                break;
+            default:
+                throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0301, resourceName);
         }
-        else {
-            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0301, resourceName);
-        }*/
     }
 
 
@@ -165,6 +183,41 @@ public abstract class BaseAdminServlet extends JsonRestServlet {
         }
     }
 
+    private String schedulePurgeCommand(HttpServletRequest request) throws XServletException {
+        final String purgeCmdDisabledMsg = "Purge service is not enabled";
+        if (!ConfigurationService.getBoolean(PurgeService.PURGE_COMMAND_ENABLED)) {
+            LOG.warn(purgeCmdDisabledMsg);
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0307, purgeCmdDisabledMsg);
+        }
+        String wfAgeStr = request.getParameter(RestConstants.PURGE_WF_AGE);
+        String coordAgeStr = request.getParameter(RestConstants.PURGE_COORD_AGE);
+        String bundleAgeStr = request.getParameter(RestConstants.PURGE_BUNDLE_AGE);
+        String purgeLimitStr = request.getParameter(RestConstants.PURGE_LIMIT);
+        String oldCoordActionStr = request.getParameter(RestConstants.PURGE_OLD_COORD_ACTION);
+
+        int workflowAge = StringUtils.isBlank(wfAgeStr) ? ConfigurationService.getInt(PurgeService.CONF_OLDER_THAN)
+                : ValidationUtil.parsePositiveInteger(wfAgeStr);
+        int coordAge = StringUtils.isBlank(coordAgeStr) ? ConfigurationService.getInt(PurgeService.COORD_CONF_OLDER_THAN)
+                : ValidationUtil.parsePositiveInteger(coordAgeStr);
+        int bundleAge = StringUtils.isBlank(bundleAgeStr) ? ConfigurationService.getInt(PurgeService.BUNDLE_CONF_OLDER_THAN)
+                : ValidationUtil.parsePositiveInteger(bundleAgeStr);
+        int purgeLimit = StringUtils.isBlank(purgeLimitStr) ? ConfigurationService.getInt(PurgeService.PURGE_LIMIT)
+                : ValidationUtil.parsePositiveInteger(purgeLimitStr);
+        boolean purgeOldCoordAction = StringUtils.isBlank(oldCoordActionStr)
+                ? ConfigurationService.getBoolean(PurgeService.PURGE_OLD_COORD_ACTION)
+                : Boolean.parseBoolean(oldCoordActionStr);
+
+        LOG.info("Executing oozie purge command.");
+        PurgeXCommand purgeXCommand = new PurgeXCommand(workflowAge, coordAge, bundleAge, purgeLimit, purgeOldCoordAction);
+        try {
+            purgeXCommand.call();
+            return "Purge command executed successfully";
+        } catch (CommandException e) {
+            throw new XServletException(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, e.getErrorCode(), e.getMessage(),
+                    e.getCause());
+        }
+    }
+
     /**
      * Gets the list of share lib.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
index fd573d5..d4a0989 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V0AdminServlet.java
@@ -89,18 +89,10 @@ public class V0AdminServlet extends BaseAdminServlet {
     @Override
     protected void setOozieMode(HttpServletRequest request, HttpServletResponse response, String resourceName)
             throws XServletException {
-        if (resourceName.equals(RestConstants.ADMIN_STATUS_RESOURCE)) {
-            boolean safeMode = Boolean.parseBoolean(request.getParameter(modeTag));
-            //Services.get().setSafeMode(safeMode);
-            SYSTEM_MODE sysMode = safeMode == true ? SYSTEM_MODE.NOWEBSERVICE : SYSTEM_MODE.NORMAL;
-            System.out.println(modeTag + " DDDD " + sysMode);
-            Services.get().setSystemMode(sysMode);
-            response.setStatus(HttpServletResponse.SC_OK);
-        }
-        else {
-            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST,
-                                        ErrorCode.E0301, resourceName);
-        }
+        boolean safeMode = Boolean.parseBoolean(request.getParameter(modeTag));
+        SYSTEM_MODE sysMode = safeMode ? SYSTEM_MODE.NOWEBSERVICE : SYSTEM_MODE.NORMAL;
+        Services.get().setSystemMode(sysMode);
+        response.setStatus(HttpServletResponse.SC_OK);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
index 965a19a..fc3a6bc 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V1AdminServlet.java
@@ -18,40 +18,33 @@
 
 package org.apache.oozie.servlet;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.OozieClient.SYSTEM_MODE;
-import org.apache.oozie.client.rest.JMSConnectionInfoBean;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
-import org.apache.oozie.jms.JMSConnectionInfo;
-import org.apache.oozie.jms.JMSJobEventListener;
 import org.apache.oozie.service.CallableQueueService;
-import org.apache.oozie.service.JMSTopicService;
 import org.apache.oozie.service.Services;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
 public class V1AdminServlet extends BaseAdminServlet {
 
     private static final long serialVersionUID = 1L;
     private static final String INSTRUMENTATION_NAME = "v1admin";
-    private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[13];
+    private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[14];
 
     static {
         RESOURCES_INFO[0] = new ResourceInfo(RestConstants.ADMIN_STATUS_RESOURCE, Arrays.asList("PUT", "GET"),
-                                             Arrays.asList(new ParameterInfo(RestConstants.ADMIN_SYSTEM_MODE_PARAM, String.class, true,
+                Arrays.asList(new ParameterInfo(RestConstants.ADMIN_SYSTEM_MODE_PARAM, String.class, true,
                                                                              Arrays.asList("PUT"))));
         RESOURCES_INFO[1] = new ResourceInfo(RestConstants.ADMIN_OS_ENV_RESOURCE, Arrays.asList("GET"),
                 Collections.EMPTY_LIST);
@@ -77,7 +70,7 @@ public class V1AdminServlet extends BaseAdminServlet {
                 Collections.EMPTY_LIST);
         RESOURCES_INFO[12] = new ResourceInfo(RestConstants.ADMIN_METRICS_RESOURCE, Arrays.asList("GET"),
                 Collections.EMPTY_LIST);
-
+        RESOURCES_INFO[13] = new ResourceInfo(RestConstants.ADMIN_PURGE, Arrays.asList("PUT"), Collections.EMPTY_LIST);
     }
 
     protected V1AdminServlet(String name) {
@@ -114,15 +107,9 @@ public class V1AdminServlet extends BaseAdminServlet {
     protected void setOozieMode(HttpServletRequest request,
                                 HttpServletResponse response, String resourceName)
             throws XServletException {
-        if (resourceName.equals(RestConstants.ADMIN_STATUS_RESOURCE)) {
-            SYSTEM_MODE sysMode = SYSTEM_MODE.valueOf(request.getParameter(modeTag));
-            Services.get().setSystemMode(sysMode);
-            response.setStatus(HttpServletResponse.SC_OK);
-        }
-        else {
-            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST,
-                                        ErrorCode.E0301, resourceName);
-        }
+        SYSTEM_MODE sysMode = SYSTEM_MODE.valueOf(request.getParameter(modeTag));
+        Services.get().setSystemMode(sysMode);
+        response.setStatus(HttpServletResponse.SC_OK);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 70c34be..03f428a 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -375,6 +375,14 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.PurgeService.enable.command.line</name>
+        <value>true</value>
+        <description>
+            Enable/Disable oozie admin purge command. By default, it is enabled.
+        </description>
+    </property>
+
     <!-- RecoveryService -->
 
     <property>

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
index d873b96..564db2a 100644
--- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
+++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
@@ -990,6 +990,61 @@ public class TestOozieCLI extends DagServletTestCase {
         });
     }
 
+    public void testAdminPurgeCommand() throws Exception {
+        runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                String oozieUrl = getContextURL();
+                String[] args = new String[]{"admin", "-purge", "wf=1;coord=2;bundle=3;limit=10;oldCoordAction=true", "-oozie",
+                        oozieUrl};
+                String out = runOozieCLIAndGetStdout(args);
+                assertEquals("Purge command executed successfully" + SYSTEM_LINE_SEPARATOR, out);
+                return null;
+            }
+        });
+
+        runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                String oozieUrl = getContextURL();
+                String[] args = new String[]{"admin", "-purge", "wf=1;coord=0;bundle=0;limit=10;oldCoordAction=true", "-oozie",
+                        oozieUrl};
+                String out = runOozieCLIAndGetStdout(args);
+                assertEquals("Purge command executed successfully" + SYSTEM_LINE_SEPARATOR, out);
+                return null;
+            }
+        });
+    }
+
+    public void testAdminPurgeCommandNegative() throws Exception {
+        runTest(END_POINTS, SERVLET_CLASSES, IS_SECURITY_ENABLED, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                String oozieUrl = getContextURL();
+                String[] args = new String[]{"admin", "-purge", "-oozie", oozieUrl};
+                String error = runOozieCLIAndGetStderr(args);
+                assertTrue(error.contains("Missing argument for option: purge"));
+
+                args = new String[]{"admin", "-purge", "invalid=1", "-oozie", oozieUrl};
+                error = runOozieCLIAndGetStderr(args);
+                assertTrue(error.contains("INVALID_INPUT : Invalid purge option [invalid] specified."));
+
+                args = new String[]{"admin", "-purge", "wf=1;coord=", "-oozie", oozieUrl};
+                error = runOozieCLIAndGetStderr(args);
+                assertTrue(error.contains("INVALID_INPUT : Invalid purge option pair [coord=] specified."));
+
+                args = new String[]{"admin", "-purge", "wf=1;coord=-1", "-oozie", oozieUrl};
+                error = runOozieCLIAndGetStderr(args);
+                assertTrue(error.contains("Input value should be a positive integer. Value: -1"));
+
+                args = new String[]{"admin", "-purge", "wf=a", "-oozie", oozieUrl};
+                error = runOozieCLIAndGetStderr(args);
+                assertTrue(error.contains("For input string: \"a\""));
+                return null;
+            }
+        });
+    }
+
     public void testClientBuildVersion() throws Exception {
         String[] args = new String[]{"version"};
         String out = runOozieCLIAndGetStdout(args);

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/test/java/org/apache/oozie/servlet/TestAdminServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/servlet/TestAdminServlet.java b/core/src/test/java/org/apache/oozie/servlet/TestAdminServlet.java
index ffa3dc1..8e3502c 100644
--- a/core/src/test/java/org/apache/oozie/servlet/TestAdminServlet.java
+++ b/core/src/test/java/org/apache/oozie/servlet/TestAdminServlet.java
@@ -22,8 +22,6 @@ import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.BuildInfo;
-import org.apache.oozie.servlet.V0AdminServlet;
-import org.apache.oozie.servlet.V0JobServlet;
 import org.json.simple.JSONObject;
 import org.json.simple.JSONValue;
 
@@ -267,4 +265,34 @@ public class TestAdminServlet extends DagServletTestCase {
             }
         });
     }
+
+    public void testPurgeServiceV2() throws Exception {
+        runTest("/v2/admin/*", V2AdminServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                URL url = createURL(RestConstants.ADMIN_PURGE, Collections.EMPTY_MAP);
+                HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+                connection.setRequestMethod("PUT");
+                assertEquals(HttpServletResponse.SC_OK, connection.getResponseCode());
+                return null;
+            }
+        });
+    }
+
+    public void testPurgeServiceV2Negative() throws Exception {
+        runTest("/v2/admin/*", V2AdminServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                Map<String, String> params = new HashMap<>();
+                params.put(RestConstants.PURGE_WF_AGE, "30");
+                params.put(RestConstants.PURGE_COORD_AGE, "7");
+                params.put(RestConstants.PURGE_BUNDLE_AGE, "-7");
+                URL url = createURL(RestConstants.ADMIN_PURGE, params);
+                HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+                connection.setRequestMethod("PUT");
+                assertEquals(HttpServletResponse.SC_BAD_REQUEST, connection.getResponseCode());
+                return null;
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/core/src/test/java/org/apache/oozie/servlet/TestV1AdminServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/servlet/TestV1AdminServlet.java b/core/src/test/java/org/apache/oozie/servlet/TestV1AdminServlet.java
index 35568e1..a3f3b9c 100644
--- a/core/src/test/java/org/apache/oozie/servlet/TestV1AdminServlet.java
+++ b/core/src/test/java/org/apache/oozie/servlet/TestV1AdminServlet.java
@@ -259,4 +259,17 @@ public class TestV1AdminServlet extends DagServletTestCase {
         });
 
     }
+
+    public void testPurgeServiceV1() throws Exception {
+        runTest("/v1/admin/*", V1AdminServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                URL url = createURL(RestConstants.ADMIN_PURGE, Collections.EMPTY_MAP);
+                HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+                connection.setRequestMethod("PUT");
+                assertEquals(HttpServletResponse.SC_OK, connection.getResponseCode());
+                return null;
+            }
+        });
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/docs/src/site/twiki/DG_CommandLineTool.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_CommandLineTool.twiki b/docs/src/site/twiki/DG_CommandLineTool.twiki
index 004a35f..6a9e4b4 100644
--- a/docs/src/site/twiki/DG_CommandLineTool.twiki
+++ b/docs/src/site/twiki/DG_CommandLineTool.twiki
@@ -153,6 +153,8 @@ oozie admin <OPTIONS>         : admin operations
             -systemmode <arg>   Supported in Oozie-2.0 or later versions ONLY. Change oozie
                                 system mode [NORMAL|NOWEBSERVICE|SAFEMODE]
             -version            show Oozie server build version
+            -purge <arg>        purge old oozie workflow, coordinator and bundle records from DB (parameter unit: day)
+                                wf=<N>\;coord=<N>\;bundle=<N>\;limit=<N>\;oldcoordaction=<true/false>
 </verbatim>
 
 ---+++ Oozie validate command
@@ -1513,6 +1515,25 @@ hasn't incremented yet will not show up.
 
 *Note:* If Metrics is enabled, then Instrumentation is unavailable.
 
+---+++ Running purge command
+
+Oozie admin purge command cleans up the Oozie Workflow/Coordinator/Bundle records based on the parameters.
+The unit for parameters is day.
+Purge command will delete the workflow records (wf=30) older than 30 days, coordinator records (coord=7) older than 7 days and
+bundle records (bundle=7) older than 7 days. The limit (limit=10) defines, number of records to be fetch at a time. Turn
+(oldCoordAction=true/false) =on/off= coordinator action record purging for long running coordinators. If any of the parameter is
+not provided, then it will be taken from the =oozie-default/oozie-site= configuration.
+
+Example:
+
+<verbatim>
+
+$ oozie admin -purge wf=30\;coord=7\;bundle=7\;limit=10\;oldCoordAction=true
+
+Purge command executed successfully
+
+</verbatim>
+
 ---++ Validate Operations
 
 ---+++ Validating a Workflow XML

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/docs/src/site/twiki/WebServicesAPI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/WebServicesAPI.twiki b/docs/src/site/twiki/WebServicesAPI.twiki
index 03f05d2..a63a68c 100644
--- a/docs/src/site/twiki/WebServicesAPI.twiki
+++ b/docs/src/site/twiki/WebServicesAPI.twiki
@@ -570,6 +570,34 @@ Content-Type: application/json;charset=UTF-8
 ]
 </verbatim>
 
+---++++ Purge Command
+
+Oozie admin purge command cleans up the Oozie Workflow/Coordinator/Bundle records based on the parameters.
+The unit for parameters is day.
+
+Purge command will delete the workflow records (wf=30) older than 30 days, coordinator records (coord=7) older than 7 days and
+bundle records (bundle=7) older than 7 days. The limit (limit=10) defines, number of records to be fetch at a time. Turn
+(oldCoordAction=true/false) =on/off= coordinator action record purging for long running coordinators. If any of the parameter is
+not provided, then it will be taken from the =oozie-default/oozie-site= configuration.
+
+*Request:*
+
+<verbatim>
+
+GET /oozie/v2/admin/purge?wf=30&coord=7&bundle=7&limit=10&oldCoordAction=true
+
+</verbatim>
+
+*Response:*
+
+<verbatim>
+
+{
+  "purge": "Purge command executed successfully"
+}
+
+</verbatim>
+
 ---+++ Job and Jobs End-Points
 
 _Modified in Oozie v1 WS API_

http://git-wip-us.apache.org/repos/asf/oozie/blob/9b07ee82/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4a459cb..90b6a02 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2041 Add an admin command to run the PurgeXCommand (abhishekbafna)
 OOZIE-2393 Allow table drop in hcat prepare (abhishekbafna)
 OOZIE-2835 TestIOUtils shall not be an XTestCase (asasvari via pbacsko)
 OOZIE-2817 Increase test case stability in pre-commit job (gezapeti)