You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/04/19 11:05:53 UTC
[1/2] oozie git commit: OOZIE-2827 More directly view of the coordinator’s history from perspective of workflow action. (Alonzo Zhou via pbacsko)
Repository: oozie
Updated Branches:
refs/heads/master 0e1a00044 -> 406183137
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java b/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
index 4fc8653..d771f29 100644
--- a/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
+++ b/core/src/test/java/org/apache/oozie/servlet/MockCoordinatorEngineService.java
@@ -38,12 +38,15 @@ import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Execution;
+import org.apache.oozie.client.CoordinatorWfAction;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.service.CoordinatorEngineService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.Pair;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.CoordinatorWfActionBean;
public class MockCoordinatorEngineService extends CoordinatorEngineService {
public static final String JOB_ID = "coord-job-C-";
@@ -61,9 +64,13 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
public static Integer length = null;
public static String order = null;
public static String filter = null;
+ public static String wfActionName = null;
public static List<CoordinatorJob> coordJobs;
- public static List<Boolean> started;
+ public static List<CoordinatorWfActionBean> coordWfActions;
+ public static List<Boolean> startedCoordJobs;
public static final int INIT_COORD_COUNT = 4;
+ public static final int INIT_WF_ACTION_COUNT = 2;
+
static {
@@ -77,11 +84,17 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
order = null;
filter = null;
coordJobs = new ArrayList<CoordinatorJob>();
- started = new ArrayList<Boolean>();
+ coordWfActions = new ArrayList<CoordinatorWfActionBean>();
+ startedCoordJobs = new ArrayList<Boolean>();
for (int i = 0; i < INIT_COORD_COUNT; i++) {
coordJobs.add(createDummyCoordinatorJob(i));
- started.add(false);
+ startedCoordJobs.add(false);
+ }
+ for(int i = 0; i < INIT_WF_ACTION_COUNT; i++) {
+ coordWfActions.add(createDummyCoordWfAction("actionTest", i+1));
}
+ String nullReason = CoordinatorWfAction.NullReason.ACTION_NULL.getNullReason("actionTest2", "wf1");
+ coordWfActions.add(new CoordinatorWfActionBean(INIT_WF_ACTION_COUNT+1, null, nullReason));
}
@Override
@@ -108,7 +121,7 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
did = "submit";
int idx = coordJobs.size();
coordJobs.add(createDummyCoordinatorJob(idx, conf));
- started.add(startJob);
+ startedCoordJobs.add(startJob);
return JOB_ID + idx;
}
@@ -117,7 +130,7 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
did = RestConstants.JOB_ACTION_DRYRUN;
int idx = coordJobs.size();
coordJobs.add(createDummyCoordinatorJob(idx, conf));
- started.add(false);
+ startedCoordJobs.add(false);
return JOB_ID + idx;
}
@@ -125,21 +138,21 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
public void resume(String jobId) throws CoordinatorEngineException {
did = RestConstants.JOB_ACTION_RESUME;
int idx = validateCoordinatorIdx(jobId);
- started.set(idx, true);
+ startedCoordJobs.set(idx, true);
}
@Override
public void suspend(String jobId) throws CoordinatorEngineException {
did = RestConstants.JOB_ACTION_SUSPEND;
int idx = validateCoordinatorIdx(jobId);
- started.set(idx, false);
+ startedCoordJobs.set(idx, false);
}
@Override
public void kill(String jobId) throws CoordinatorEngineException {
did = RestConstants.JOB_ACTION_KILL;
int idx = validateCoordinatorIdx(jobId);
- started.set(idx, false);
+ startedCoordJobs.set(idx, false);
}
@Override
@@ -147,7 +160,7 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
throws CoordinatorEngineException {
did = RestConstants.JOB_ACTION_KILL;
int idx = validateCoordinatorIdx(jobId);
- started.set(idx, false);
+ startedCoordJobs.set(idx, false);
List<CoordinatorAction> actions = coordJobs.get(idx).getActions();
List<CoordinatorActionBean> actionBeans = new ArrayList<CoordinatorActionBean>();
@@ -162,7 +175,7 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
public void change(String jobId, String changeValue) throws CoordinatorEngineException {
did = RestConstants.JOB_ACTION_CHANGE;
int idx = validateCoordinatorIdx(jobId);
- started.set(idx, true);
+ startedCoordJobs.set(idx, true);
}
@Override
@@ -174,7 +187,7 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException {
did = RestConstants.JOB_ACTION_IGNORE;
int idx = validateCoordinatorIdx(jobId);
- started.set(idx, true);
+ startedCoordJobs.set(idx, true);
return null;
}
@Override
@@ -182,7 +195,7 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
boolean noCleanup, boolean failed, Configuration conf) throws BaseEngineException {
did = RestConstants.JOB_COORD_ACTION_RERUN;
int idx = validateCoordinatorIdx(jobId);
- started.set(idx, true);
+ startedCoordJobs.set(idx, true);
List<CoordinatorAction> actions = coordJobs.get(idx).getActions();
List<CoordinatorActionBean> actionBeans = new ArrayList<CoordinatorActionBean>();
for (CoordinatorAction action : actions) {
@@ -296,6 +309,16 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
did = RestConstants.SLA_ENABLE_ALERT;
}
+ @Override
+ public List<CoordinatorWfActionBean> getWfActionByJobIdAndName(String jobId, String wfActionName, int offset, int len)
+ throws CoordinatorEngineException {
+ did = RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD;
+ MockCoordinatorEngineService.offset = offset;
+ MockCoordinatorEngineService.length = len;
+ MockCoordinatorEngineService.wfActionName = wfActionName;
+ return coordWfActions;
+ }
+
private int validateCoordinatorIdx(String jobId) throws CoordinatorEngineException {
int idx = -1;
try {
@@ -397,4 +420,12 @@ public class MockCoordinatorEngineService extends CoordinatorEngineService {
return action;
}
+ private static CoordinatorWfActionBean createDummyCoordWfAction(String name, int id) {
+ CoordinatorWfActionBean coordWfAction = null;
+ WorkflowActionBean wfAction = new WorkflowActionBean();
+ wfAction.setName(name);
+ coordWfAction = new CoordinatorWfActionBean(id, wfAction, null);
+ return coordWfAction;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java b/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java
index fb203a6..bacfe89 100644
--- a/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java
+++ b/core/src/test/java/org/apache/oozie/servlet/TestV2JobServlet.java
@@ -18,9 +18,12 @@
package org.apache.oozie.servlet;
+import org.apache.oozie.client.CoordinatorWfAction;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.client.rest.JsonTags;
+import org.apache.oozie.service.ConfigurationService;
+import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
@@ -256,4 +259,190 @@ public class TestV2JobServlet extends DagServletTestCase {
}
});
}
+
+ //test normal request
+ public void testGetWfActionByJobIdAndNameNormal() throws Exception {
+ runTest("/v2/job/*", V2JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ params.put(RestConstants.OFFSET_PARAM, "2");
+ params.put(RestConstants.LEN_PARAM, "2");
+ params.put(RestConstants.ACTION_NAME_PARAM, "actionTest");
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+ assertTrue(conn.getHeaderField("content-type").startsWith(RestConstants.JSON_CONTENT_TYPE));
+ JSONObject obj = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
+ assertEquals(MockCoordinatorEngineService.JOB_ID+1, obj.get(JsonTags.COORDINATOR_JOB_ID));
+ assertEquals(RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD, MockCoordinatorEngineService.did);
+ assertEquals(MockCoordinatorEngineService.offset.intValue(), 2);
+ assertEquals(MockCoordinatorEngineService.length.intValue(), 2);
+ JSONArray coordWfActions = (JSONArray) obj.get(JsonTags.COORDINATOR_WF_ACTIONS);
+ assertEquals(coordWfActions.size(), 3);
+ for(int i = 0; i < coordWfActions.size(); i++) {
+ JSONObject coordWfAction = (JSONObject) coordWfActions.get(i);
+ JSONObject wfAction = (JSONObject) coordWfAction.get(JsonTags.COORDINATOR_WF_ACTION);
+ if (i == (coordWfActions.size() - 1)) {
+ assertEquals(null, wfAction);
+ String nullReason = CoordinatorWfAction.NullReason.ACTION_NULL.getNullReason("actionTest2", "wf1");
+ assertEquals(nullReason, coordWfAction.get(JsonTags.COORDINATOR_WF_ACTION_NULL_REASON));
+ }
+ else {
+ assertEquals("actionTest", wfAction.get(JsonTags.WORKFLOW_ACTION_NAME));
+ }
+ }
+
+ return null;
+ }
+ });
+ }
+
+ //test missing parameter action-name
+ public void testGetWfActionByJobIdAndNameActionNameMissing() throws Exception {
+ runTest("/v2/job/*", V2JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+ return null;
+ }
+ });
+ }
+
+ //test unparseable offset
+ public void testGetWfActionByJobIdAndNameUnparseableOffset() throws Exception {
+ runTest("/v2/job/*", V2JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ params.put(RestConstants.OFFSET_PARAM, "2abc");
+ params.put(RestConstants.LEN_PARAM, "2");
+ params.put(RestConstants.ACTION_NAME_PARAM, "actionTest");
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+ return null;
+ }
+ });
+ }
+
+ //test unparseable len
+ public void testGetWfActionByJobIdAndNameUnparseableLen() throws Exception {
+ runTest("/v2/job/*", V2JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ params.put(RestConstants.OFFSET_PARAM, "2");
+ params.put(RestConstants.LEN_PARAM, "2abc");
+ params.put(RestConstants.ACTION_NAME_PARAM, "actionTest");
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+ return null;
+ }
+ });
+ }
+
+ public void testGetWfActionByJobIdAndNameOffsetOutOfRange() throws Exception {
+ runTest("/v2/job/*", V2JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ params.put(RestConstants.OFFSET_PARAM, "-1");
+ params.put(RestConstants.LEN_PARAM, "2");
+ params.put(RestConstants.ACTION_NAME_PARAM, "actionTest");
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+ assertEquals(MockCoordinatorEngineService.offset.intValue(), 1);
+
+ return null;
+ }
+ });
+ }
+
+ public void testGetWfActionByJobIdAndNameLenOutOfRange() throws Exception {
+ runTest("/v2/job/*", V2JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ params.put(RestConstants.OFFSET_PARAM, "1");
+ params.put(RestConstants.LEN_PARAM, "-1");
+ params.put(RestConstants.ACTION_NAME_PARAM, "actionTest");
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+ assertEquals(MockCoordinatorEngineService.length.intValue(),
+ ConfigurationService.getInt("oozie.coord.actions.default.length"));
+
+ return null;
+ }
+ });
+ }
+
+ public void testGetWfActionFromV0JobServlet() throws Exception {
+ runTest("/v0/job/*", V0JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ params.put(RestConstants.OFFSET_PARAM, "2");
+ params.put(RestConstants.LEN_PARAM, "2");
+ params.put(RestConstants.ACTION_NAME_PARAM, "actionTest");
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+ return null;
+ }
+ });
+ }
+
+ public void testGetWfActionFromV1JobServlet() throws Exception {
+ runTest("/v1/job/*", V1JobServlet.class, IS_SECURITY_ENABLED, new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ MockCoordinatorEngineService.reset();
+ Map<String, String> params = new HashMap<String, String>();
+ params = new HashMap<String, String>();
+ params.put(RestConstants.JOB_SHOW_PARAM, RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD);
+ params.put(RestConstants.OFFSET_PARAM, "2");
+ params.put(RestConstants.LEN_PARAM, "2");
+ params.put(RestConstants.ACTION_NAME_PARAM, "actionTest");
+ URL url = createURL(MockCoordinatorEngineService.JOB_ID + 1, params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+ return null;
+ }
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 69c82ff..71af781 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.4.0 release (trunk - unreleased)
+OOZIE-2827 More directly view of the coordinator\u2019s history from perspective of workflow action. (Alonzo Zhou via pbacsko)
OOZIE-2864 Maven artifacts for package com.codahale.metrics have inconsistent groupId (andras.piros via pbacsko)
OOZIE-2862 Coord change command doesn't change job to running if job was killed without creating any actions (puru)
OOZIE-2815 Oozie not always display job log (puru)
[2/2] oozie git commit: OOZIE-2827 More directly view of the coordinator’s history from perspective of workflow action. (Alonzo Zhou via pbacsko)
Posted by pb...@apache.org.
OOZIE-2827 More directly view of the coordinator\u2019s history from perspective of workflow action. (Alonzo Zhou via pbacsko)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/40618313
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/40618313
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/40618313
Branch: refs/heads/master
Commit: 4061831378daca80191e5abe296c3ee75e79c8bd
Parents: 0e1a000
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Wed Apr 19 13:05:36 2017 +0200
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Wed Apr 19 13:05:36 2017 +0200
----------------------------------------------------------------------
.../oozie/client/CoordinatorWfAction.java | 48 +
.../org/apache/oozie/client/rest/JsonTags.java | 5 +
.../apache/oozie/client/rest/RestConstants.java | 4 +
.../org/apache/oozie/CoordinatorEngine.java | 20 +
.../apache/oozie/CoordinatorEngine.java.orig | 966 +++++++++++++++++++
.../apache/oozie/CoordinatorWfActionBean.java | 86 ++
.../main/java/org/apache/oozie/ErrorCode.java | 1 +
.../coord/CoordWfActionInfoXCommand.java | 144 +++
.../jpa/WorkflowActionGetJPAExecutor.java | 16 +-
.../apache/oozie/servlet/BaseJobServlet.java | 22 +
.../org/apache/oozie/servlet/V2JobServlet.java | 45 +
.../org/apache/oozie/client/TestOozieCLI.java | 38 +-
.../coord/TestCoordWfActionInfoXCommand.java | 174 ++++
.../servlet/MockCoordinatorEngineService.java | 55 +-
.../apache/oozie/servlet/TestV2JobServlet.java | 189 ++++
release-log.txt | 1 +
16 files changed, 1782 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java b/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java
new file mode 100644
index 0000000..056981d
--- /dev/null
+++ b/client/src/main/java/org/apache/oozie/client/CoordinatorWfAction.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.client;
+
+import java.text.MessageFormat;
+
+public interface CoordinatorWfAction{
+
+ enum NullReason{
+
+ ACTION_NULL("Could not get workflow action, no action named {0} in workflow {1}"),
+ PARENT_NULL("Could not get workflow action, workflow instance is null"),;
+
+ private String template;
+
+ NullReason(String template) {
+ this.template = template;
+ }
+
+ public String getNullReason(Object... args) {
+ return MessageFormat.format(template, args);
+ }
+
+ }
+
+ int getActionNumber();
+
+ WorkflowAction getAction();
+
+ String getNullReason();
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
index ca168e0..7b6f50c 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
@@ -143,6 +143,11 @@ public interface JsonTags {
String COORDINATOR_ACTION_DATASETS = "dataSets";
String COORDINATOR_ACTION_DATASET = "dataSet";
+ String COORDINATOR_WF_ACTION_NUMBER = "actionNumber";
+ String COORDINATOR_WF_ACTION = "action";
+ String COORDINATOR_WF_ACTION_NULL_REASON = "nullReason";
+ String COORDINATOR_WF_ACTIONS = "actions";
+
String BUNDLE_JOB_ID = "bundleJobId";
String BUNDLE_JOB_NAME = "bundleJobName";
String BUNDLE_JOB_PATH = "bundleJobPath";
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
index 8ddb1f8..f477531 100644
--- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
+++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
@@ -49,6 +49,8 @@ public interface RestConstants {
String ORDER_PARAM = "order";
+ String ACTION_NAME_PARAM = "action-name";
+
String JOB_FILTER_PARAM = "filter";
String JOB_RESOURCE = "/job";
@@ -101,6 +103,8 @@ public interface RestConstants {
String JOB_SHOW_STATUS = "status";
+ String JOB_SHOW_WF_ACTIONS_IN_COORD = "wf-actions";
+
String JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM = "coord-scope";
String JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM = "date-scope";
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
index 2c04bea..85de6f9 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java
@@ -61,6 +61,7 @@ import org.apache.oozie.command.coord.CoordSLAChangeXCommand;
import org.apache.oozie.command.coord.CoordSubmitXCommand;
import org.apache.oozie.command.coord.CoordSuspendXCommand;
import org.apache.oozie.command.coord.CoordUpdateXCommand;
+import org.apache.oozie.command.coord.CoordWfActionInfoXCommand;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
@@ -963,4 +964,23 @@ public class CoordinatorEngine extends BaseEngine {
String actions, String dates) throws CommandException {
return new CoordActionMissingDependenciesXCommand(id, actions, dates).call();
}
+
+ /**
+ * get wf actions by action name in a coordinator job
+ * @param jobId coordinator job id
+ * @param wfActionName workflow action name
+ * @param offset
+ * @param len
+ * @return list of CoordinatorWfActionBean in a coordinator
+ * @throws CoordinatorEngineException
+ */
+ public List<CoordinatorWfActionBean> getWfActionByJobIdAndName(String jobId, String wfActionName, int offset, int len)
+ throws CoordinatorEngineException {
+ try {
+ return new CoordWfActionInfoXCommand(jobId, wfActionName, offset, len).call();
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig
new file mode 100644
index 0000000..2c04bea
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java.orig
@@ -0,0 +1,966 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie;
+
+import java.io.IOException;
+import java.io.Writer;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.OperationType;
+import org.apache.oozie.command.coord.BulkCoordXCommand;
+import org.apache.oozie.command.coord.CoordActionInfoXCommand;
+import org.apache.oozie.command.coord.CoordActionsIgnoreXCommand;
+import org.apache.oozie.command.coord.CoordActionsKillXCommand;
+import org.apache.oozie.command.coord.CoordChangeXCommand;
+import org.apache.oozie.command.coord.CoordActionMissingDependenciesXCommand;
+import org.apache.oozie.command.coord.CoordJobXCommand;
+import org.apache.oozie.command.coord.CoordJobsXCommand;
+import org.apache.oozie.command.coord.CoordKillXCommand;
+import org.apache.oozie.command.coord.CoordRerunXCommand;
+import org.apache.oozie.command.coord.CoordResumeXCommand;
+import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand;
+import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand;
+import org.apache.oozie.command.coord.CoordSLAChangeXCommand;
+import org.apache.oozie.command.coord.CoordSubmitXCommand;
+import org.apache.oozie.command.coord.CoordSuspendXCommand;
+import org.apache.oozie.command.coord.CoordUpdateXCommand;
+import org.apache.oozie.dependency.ActionDependency;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
+import org.apache.oozie.service.DagXLogInfoService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.XLogStreamingService;
+import org.apache.oozie.util.CoordActionsInDateRange;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.JobUtils;
+import org.apache.oozie.util.Pair;
+import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XLogFilter;
+import org.apache.oozie.util.XLogStreamer;
+import org.apache.oozie.util.XLogUserFilterParam;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class CoordinatorEngine extends BaseEngine {
+ private static final XLog LOG = XLog.getLog(CoordinatorEngine.class);
+ public final static String COORD_ACTIONS_LOG_MAX_COUNT = "oozie.coord.actions.log.max.count";
+ private final static int COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT = 50;
+ private final int maxNumActionsForLog;
+
+ public enum FILTER_COMPARATORS {
+ //This ordering is important, dont change this
+ GREATER_EQUAL(">="), GREATER(">"), LESSTHAN_EQUAL("<="), LESSTHAN("<"), NOT_EQUALS("!="), EQUALS("=");
+
+ private final String sign;
+
+ FILTER_COMPARATORS(String sign) {
+ this.sign = sign;
+ }
+
+ public String getSign() {
+ return sign;
+ }
+ }
+
+ public static final String[] VALID_JOB_FILTERS = {OozieClient.FILTER_STATUS, OozieClient.FILTER_NOMINAL_TIME};
+
+ /**
+ * Create a system Coordinator engine, with no user and no group.
+ */
+ public CoordinatorEngine() {
+ maxNumActionsForLog = Services.get().getConf()
+ .getInt(COORD_ACTIONS_LOG_MAX_COUNT, COORD_ACTIONS_LOG_MAX_COUNT_DEFAULT);
+ }
+
+ /**
+ * Create a Coordinator engine to perform operations on behave of a user.
+ *
+ * @param user user name.
+ */
+ public CoordinatorEngine(String user) {
+ this();
+ this.user = ParamChecker.notEmpty(user, "user");
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#getDefinition(java.lang.String)
+ */
+ @Override
+ public String getDefinition(String jobId) throws BaseEngineException {
+ CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+ return job.getOrigJobXml();
+ }
+
+ /**
+ * @param jobId
+ * @return CoordinatorJobBean
+ * @throws BaseEngineException
+ */
+ private CoordinatorJobBean getCoordJobWithNoActionInfo(String jobId) throws BaseEngineException {
+ try {
+ return new CoordJobXCommand(jobId).call();
+ }
+ catch (CommandException ex) {
+ throw new BaseEngineException(ex);
+ }
+ }
+
+ /**
+ * @param actionId
+ * @return CoordinatorActionBean
+ * @throws BaseEngineException
+ */
+ public CoordinatorActionBean getCoordAction(String actionId) throws BaseEngineException {
+ try {
+ return new CoordActionInfoXCommand(actionId).call();
+ }
+ catch (CommandException ex) {
+ throw new BaseEngineException(ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String)
+ */
+ @Override
+ public CoordinatorJobBean getCoordJob(String jobId) throws BaseEngineException {
+ try {
+ return new CoordJobXCommand(jobId).call();
+ }
+ catch (CommandException ex) {
+ throw new BaseEngineException(ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#getCoordJob(java.lang.String, java.lang.String, int, int)
+ */
+ @Override
+ public CoordinatorJobBean getCoordJob(String jobId, String filter, int offset, int length, boolean desc)
+ throws BaseEngineException {
+ Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = parseJobFilter(filter);
+ try {
+ return new CoordJobXCommand(jobId, filterMap, offset, length, desc).call();
+ }
+ catch (CommandException ex) {
+ throw new BaseEngineException(ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#getJobIdForExternalId(java.lang.String)
+ */
+ @Override
+ public String getJobIdForExternalId(String externalId) throws CoordinatorEngineException {
+ return null;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#kill(java.lang.String)
+ */
+ @Override
+ public void kill(String jobId) throws CoordinatorEngineException {
+ try {
+ new CoordKillXCommand(jobId).call();
+ LOG.info("User " + user + " killed the Coordinator job " + jobId);
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ public CoordinatorActionInfo killActions(String jobId, String rangeType, String scope) throws CoordinatorEngineException {
+ try {
+ return new CoordActionsKillXCommand(jobId, rangeType, scope).call();
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.BaseEngine#change(java.lang.String, java.lang.String)
+ */
+ @Override
+ public void change(String jobId, String changeValue) throws CoordinatorEngineException {
+ try {
+ new CoordChangeXCommand(jobId, changeValue).call();
+ LOG.info("User " + user + " changed the Coordinator job [" + jobId + "] to " + changeValue);
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ public CoordinatorActionInfo ignore(String jobId, String type, String scope) throws CoordinatorEngineException {
+ try {
+ LOG.info("User " + user + " ignore a Coordinator Action (s) [" + scope + "] of the Coordinator Job ["
+ + jobId + "]");
+ return new CoordActionsIgnoreXCommand(jobId, type, scope).call();
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ @Override
+ @Deprecated
+ public void reRun(String jobId, Configuration conf) throws BaseEngineException {
+ throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of rerun"));
+ }
+
+ /**
+ * Rerun coordinator actions for given rerunType
+ *
+ * @param jobId
+ * @param rerunType
+ * @param scope
+ * @param refresh
+ * @param noCleanup
+ * @throws BaseEngineException
+ */
+ public CoordinatorActionInfo reRun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup,
+ boolean failed, Configuration conf)
+ throws BaseEngineException {
+ try {
+ return new CoordRerunXCommand(jobId, rerunType, scope, refresh,
+ noCleanup, failed, conf).call();
+ }
+ catch (CommandException ex) {
+ throw new BaseEngineException(ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#resume(java.lang.String)
+ */
+ @Override
+ public void resume(String jobId) throws CoordinatorEngineException {
+ try {
+ new CoordResumeXCommand(jobId).call();
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ @Override
+ @Deprecated
+ public void start(String jobId) throws BaseEngineException {
+ throw new BaseEngineException(new XException(ErrorCode.E0301, "invalid use of start"));
+ }
+
+
+ @Override
+ protected void streamJobLog(XLogStreamer logStreamer, String jobId, Writer writer)
+ throws IOException, BaseEngineException {
+ logStreamer.getXLogFilter().setParameter(DagXLogInfoService.JOB, jobId);
+ Date lastTime = null;
+ CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+ if (job.isTerminalStatus()) {
+ lastTime = job.getLastModifiedTime();
+ }
+ if (lastTime == null) {
+ lastTime = new Date();
+ }
+ Services.get().get(XLogStreamingService.class).streamLog(logStreamer, job.getCreatedTime(), lastTime, writer);
+ }
+
+ /**
+ * Add list of actions to the filter based on conditions
+ *
+ * @param jobId Job Id
+ * @param logRetrievalScope Value for the retrieval type
+ * @param logRetrievalType Based on which filter criteria the log is retrieved
+ * @param writer writer to stream the log to
+ * @param requestParameters additional parameters from the request
+ * @throws IOException
+ * @throws BaseEngineException
+ * @throws CommandException
+ */
+ public void streamLog(String jobId, String logRetrievalScope, String logRetrievalType, Writer writer,
+ Map<String, String[]> requestParameters) throws IOException, BaseEngineException, CommandException {
+
+ Date startTime = null;
+ Date endTime = null;
+ XLogFilter filter = new XLogFilter(new XLogUserFilterParam(requestParameters));
+
+ filter.setParameter(DagXLogInfoService.JOB, jobId);
+ if (logRetrievalScope != null && logRetrievalType != null) {
+ // if coordinator action logs are to be retrieved based on action id range
+ if (logRetrievalType.equals(RestConstants.JOB_LOG_ACTION)) {
+ // Use set implementation that maintains order or elements to achieve reproducibility:
+ Set<String> actionSet = new LinkedHashSet<String>();
+ String[] list = logRetrievalScope.split(",");
+ for (String s : list) {
+ s = s.trim();
+ if (s.contains("-")) {
+ String[] range = s.split("-");
+ if (range.length != 2) {
+ throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s
+ + "'");
+ }
+ int start;
+ int end;
+ try {
+ start = Integer.parseInt(range[0].trim());
+ } catch (NumberFormatException ne) {
+ throw new CommandException(ErrorCode.E0302, "could not parse " + range[0].trim() + "into an integer",
+ ne);
+ }
+ try {
+ end = Integer.parseInt(range[1].trim());
+ } catch (NumberFormatException ne) {
+ throw new CommandException(ErrorCode.E0302, "could not parse " + range[1].trim() + "into an integer",
+ ne);
+ }
+ if (start > end) {
+ throw new CommandException(ErrorCode.E0302, "format is wrong for action's range '" + s + "'");
+ }
+ for (int i = start; i <= end; i++) {
+ actionSet.add(jobId + "@" + i);
+ }
+ }
+ else {
+ try {
+ Integer.parseInt(s);
+ }
+ catch (NumberFormatException ne) {
+ throw new CommandException(ErrorCode.E0302, "format is wrong for action id'" + s
+ + "'. Integer only.");
+ }
+ actionSet.add(jobId + "@" + s);
+ }
+ }
+
+ if (actionSet.size() >= maxNumActionsForLog) {
+ throw new CommandException(ErrorCode.E0302,
+ "Retrieving log of too many coordinator actions. Max count is "
+ + maxNumActionsForLog + " actions");
+ }
+ Iterator<String> actionsIterator = actionSet.iterator();
+ StringBuilder orSeparatedActions = new StringBuilder("");
+ boolean orRequired = false;
+ while (actionsIterator.hasNext()) {
+ if (orRequired) {
+ orSeparatedActions.append("|");
+ }
+ orSeparatedActions.append(actionsIterator.next().toString());
+ orRequired = true;
+ }
+ if (actionSet.size() > 1 && orRequired) {
+ orSeparatedActions.insert(0, "(");
+ orSeparatedActions.append(")");
+ }
+
+ filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
+ if (actionSet != null && actionSet.size() == 1) {
+ CoordinatorActionBean actionBean = getCoordAction(actionSet.iterator().next());
+ startTime = actionBean.getCreatedTime();
+ endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
+ .getLastModifiedTime();
+ filter.setActionList(true);
+ }
+ else if (actionSet != null && actionSet.size() > 0) {
+ List<String> tempList = new ArrayList<String>(actionSet);
+ Collections.sort(tempList, new Comparator<String>() {
+ public int compare(String a, String b) {
+ return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
+ Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
+ }
+ });
+ startTime = getCoordAction(tempList.get(0)).getCreatedTime();
+ endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, tempList.get(0),
+ tempList.get(tempList.size() - 1));
+ filter.setActionList(true);
+ }
+ }
+ // if coordinator action logs are to be retrieved based on date range
+ // this block gets the corresponding list of coordinator actions to be used by the log filter
+ if (logRetrievalType.equalsIgnoreCase(RestConstants.JOB_LOG_DATE)) {
+ List<String> coordActionIdList = null;
+ try {
+ coordActionIdList = CoordActionsInDateRange.getCoordActionIdsFromDates(jobId, logRetrievalScope);
+ }
+ catch (XException xe) {
+ throw new CommandException(ErrorCode.E0302, "Error in date range for coordinator actions", xe);
+ }
+ if(coordActionIdList.size() >= maxNumActionsForLog) {
+ throw new CommandException(ErrorCode.E0302,
+ "Retrieving log of too many coordinator actions. Max count is "
+ + maxNumActionsForLog + " actions");
+ }
+ StringBuilder orSeparatedActions = new StringBuilder("");
+ boolean orRequired = false;
+ for (String coordActionId : coordActionIdList) {
+ if (orRequired) {
+ orSeparatedActions.append("|");
+ }
+ orSeparatedActions.append(coordActionId);
+ orRequired = true;
+ }
+ if (coordActionIdList.size() > 1 && orRequired) {
+ orSeparatedActions.insert(0, "(");
+ orSeparatedActions.append(")");
+ }
+ filter.setParameter(DagXLogInfoService.ACTION, orSeparatedActions.toString());
+ if (coordActionIdList != null && coordActionIdList.size() == 1) {
+ CoordinatorActionBean actionBean = getCoordAction(coordActionIdList.get(0));
+ startTime = actionBean.getCreatedTime();
+ endTime = actionBean.getStatus().equals(CoordinatorAction.Status.RUNNING) ? new Date() : actionBean
+ .getLastModifiedTime();
+ filter.setActionList(true);
+ }
+ else if (coordActionIdList != null && coordActionIdList.size() > 0) {
+ Collections.sort(coordActionIdList, new Comparator<String>() {
+ public int compare(String a, String b) {
+ return Integer.valueOf(a.substring(a.lastIndexOf("@") + 1)).compareTo(
+ Integer.valueOf(b.substring(b.lastIndexOf("@") + 1)));
+ }
+ });
+ startTime = getCoordAction(coordActionIdList.get(0)).getCreatedTime();
+ endTime = CoordActionsInDateRange.getCoordActionsLastModifiedDate(jobId, coordActionIdList.get(0),
+ coordActionIdList.get(coordActionIdList.size() - 1));
+ filter.setActionList(true);
+ }
+ }
+ }
+ if (startTime == null || endTime == null) {
+ CoordinatorJobBean job = getCoordJobWithNoActionInfo(jobId);
+ if (startTime == null) {
+ startTime = job.getCreatedTime();
+ }
+ if (endTime == null) {
+ if (job.isTerminalStatus()) {
+ endTime = job.getLastModifiedTime();
+ }
+ if (endTime == null) {
+ endTime = new Date();
+ }
+ }
+ }
+ Services.get().get(XLogStreamingService.class).streamLog(new XLogStreamer(filter, requestParameters), startTime,
+ endTime, writer);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oozie.BaseEngine#submitJob(org.apache.hadoop.conf.Configuration
+ * , boolean)
+ */
+ @Override
+ public String submitJob(Configuration conf, boolean startJob) throws CoordinatorEngineException {
+ try {
+ CoordSubmitXCommand submit = new CoordSubmitXCommand(conf);
+ return submit.call();
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.oozie.BaseEngine#dryRunSubmit(org.apache.hadoop.conf.Configuration)
+ */
+ @Override
+ public String dryRunSubmit(Configuration conf) throws CoordinatorEngineException {
+ try {
+ CoordSubmitXCommand submit = new CoordSubmitXCommand(true, conf);
+ return submit.call();
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#suspend(java.lang.String)
+ */
+ @Override
+ public void suspend(String jobId) throws CoordinatorEngineException {
+ try {
+ new CoordSuspendXCommand(jobId).call();
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#getJob(java.lang.String)
+ */
+ @Override
+ public WorkflowJob getJob(String jobId) throws BaseEngineException {
+ throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.BaseEngine#getJob(java.lang.String, int, int)
+ */
+ @Override
+ public WorkflowJob getJob(String jobId, int start, int length) throws BaseEngineException {
+ throw new BaseEngineException(new XException(ErrorCode.E0301, "cannot get a workflow job from CoordinatorEngine"));
+ }
+
+ private static final Set<String> FILTER_NAMES = new HashSet<String>();
+
+ static {
+ FILTER_NAMES.add(OozieClient.FILTER_USER);
+ FILTER_NAMES.add(OozieClient.FILTER_NAME);
+ FILTER_NAMES.add(OozieClient.FILTER_GROUP);
+ FILTER_NAMES.add(OozieClient.FILTER_STATUS);
+ FILTER_NAMES.add(OozieClient.FILTER_ID);
+ FILTER_NAMES.add(OozieClient.FILTER_FREQUENCY);
+ FILTER_NAMES.add(OozieClient.FILTER_UNIT);
+ FILTER_NAMES.add(OozieClient.FILTER_SORT_BY);
+ FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_START);
+ FILTER_NAMES.add(OozieClient.FILTER_CREATED_TIME_END);
+ FILTER_NAMES.add(OozieClient.FILTER_TEXT);
+ }
+
+ /**
+ * @param filter
+ * @param start
+ * @param len
+ * @return CoordinatorJobInfo
+ * @throws CoordinatorEngineException
+ */
+ public CoordinatorJobInfo getCoordJobs(String filter, int start, int len) throws CoordinatorEngineException {
+ Map<String, List<String>> filterList = parseJobsFilter(filter);
+
+ try {
+ return new CoordJobsXCommand(filterList, start, len).call();
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
+
+ // Parses the filter string (e.g status=RUNNING;status=WAITING) and returns a list of status values
+ public Map<Pair<String, FILTER_COMPARATORS>, List<Object>> parseJobFilter(String filter) throws
+ CoordinatorEngineException {
+ Map<Pair<String, FILTER_COMPARATORS>, List<Object>> filterMap = new HashMap<Pair<String,
+ FILTER_COMPARATORS>, List<Object>>();
+ if (filter != null) {
+ //split name value pairs
+ StringTokenizer st = new StringTokenizer(filter, ";");
+ while (st.hasMoreTokens()) {
+ String token = st.nextToken().trim();
+ Pair<String, FILTER_COMPARATORS> pair = null;
+ for (FILTER_COMPARATORS comp : FILTER_COMPARATORS.values()) {
+ if (token.contains(comp.getSign())) {
+ int index = token.indexOf(comp.getSign());
+ String key = token.substring(0, index);
+ String valueStr = token.substring(index + comp.getSign().length());
+ Object value;
+
+ if (key.equalsIgnoreCase(OozieClient.FILTER_STATUS)) {
+ value = valueStr.toUpperCase();
+ try {
+ CoordinatorAction.Status.valueOf((String) value);
+ } catch (IllegalArgumentException ex) {
+ // Check for incorrect status value
+ throw new CoordinatorEngineException(ErrorCode.E0421, filter,
+ XLog.format("invalid status value [{0}]." + " Valid status values are: [{1}]",
+ valueStr, StringUtils.join(CoordinatorAction.Status.values(), ", ")));
+ }
+
+ if (!(comp == FILTER_COMPARATORS.EQUALS || comp == FILTER_COMPARATORS.NOT_EQUALS)) {
+ throw new CoordinatorEngineException(ErrorCode.E0421, filter,
+ XLog.format("invalid comparator [{0}] for status." + " Valid are = and !=",
+ comp.getSign()));
+ }
+
+ pair = Pair.of(OozieClient.FILTER_STATUS, comp);
+ } else if (key.equalsIgnoreCase(OozieClient.FILTER_NOMINAL_TIME)) {
+ try {
+ value = new Timestamp(DateUtils.parseDateUTC(valueStr).getTime());
+ } catch (ParseException e) {
+ throw new CoordinatorEngineException(ErrorCode.E0421, filter,
+ XLog.format("invalid nominal time [{0}]." + " Valid format: " +
+ "[{1}]", valueStr, DateUtils.ISO8601_UTC_MASK));
+ }
+ pair = Pair.of(OozieClient.FILTER_NOMINAL_TIME, comp);
+ } else {
+ // Check for incorrect filter option
+ throw new CoordinatorEngineException(ErrorCode.E0421, filter,
+ XLog.format("invalid filter [{0}]." + " Valid filters [{1}]", key, StringUtils.join
+ (VALID_JOB_FILTERS, ", ")));
+ }
+ if (!filterMap.containsKey(pair)) {
+ filterMap.put(pair, new ArrayList<Object>());
+ }
+ filterMap.get(pair).add(value);
+ break;
+ }
+ }
+
+ if (pair == null) {
+ //token doesn't contain comparator
+ throw new CoordinatorEngineException(ErrorCode.E0421, filter,
+ "filter should be of format <key><comparator><value> pairs");
+ }
+ }
+ }
+ return filterMap;
+ }
+
+ /**
+ * @param filter
+ * @return Map<String, List<String>>
+ * @throws CoordinatorEngineException
+ */
+ @VisibleForTesting
+ Map<String, List<String>> parseJobsFilter(String filter) throws CoordinatorEngineException {
+ Map<String, List<String>> map = new HashMap<String, List<String>>();
+ boolean isTimeUnitSpecified = false;
+ String timeUnit = "MINUTE";
+ boolean isFrequencySpecified = false;
+ String frequency = "";
+ if (filter != null) {
+ StringTokenizer st = new StringTokenizer(filter, ";");
+ while (st.hasMoreTokens()) {
+ String token = st.nextToken();
+ if (token.contains("=")) {
+ String[] pair = token.split("=");
+ if (pair.length != 2) {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter,
+ "elements must be semicolon-separated name=value pairs");
+ }
+ pair[0] = pair[0].toLowerCase();
+ if (!FILTER_NAMES.contains(pair[0])) {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format("invalid name [{0}]",
+ pair[0]));
+ }
+ if (pair[0].equalsIgnoreCase("frequency")) {
+ isFrequencySpecified = true;
+ try {
+ frequency = (int) Float.parseFloat(pair[1]) + "";
+ continue;
+ }
+ catch (NumberFormatException NANException) {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
+ "invalid value [{0}] for frequency. A numerical value is expected", pair[1]));
+ }
+ }
+ if (pair[0].equalsIgnoreCase("unit")) {
+ isTimeUnitSpecified = true;
+ timeUnit = pair[1];
+ if (!timeUnit.equalsIgnoreCase("months") && !timeUnit.equalsIgnoreCase("days")
+ && !timeUnit.equalsIgnoreCase("hours") && !timeUnit.equalsIgnoreCase("minutes")) {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
+ "invalid value [{0}] for time unit. "
+ + "Valid value is one of months, days, hours or minutes", pair[1]));
+ }
+ continue;
+ }
+ if (pair[0].equals("status")) {
+ try {
+ CoordinatorJob.Status.valueOf(pair[1]);
+ }
+ catch (IllegalArgumentException ex) {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter, XLog.format(
+ "invalid status [{0}]", pair[1]));
+ }
+ }
+ List<String> list = map.get(pair[0]);
+ if (list == null) {
+ list = new ArrayList<String>();
+ map.put(pair[0], list);
+ }
+ list.add(pair[1]);
+ } else {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter,
+ "elements must be semicolon-separated name=value pairs");
+ }
+ }
+ // Unit is specified and frequency is not specified
+ if (!isFrequencySpecified && isTimeUnitSpecified) {
+ throw new CoordinatorEngineException(ErrorCode.E0420, filter, "time unit should be added only when "
+ + "frequency is specified. Either specify frequency also or else remove the time unit");
+ } else if (isFrequencySpecified) {
+ // Frequency value is specified
+ if (isTimeUnitSpecified) {
+ if (timeUnit.equalsIgnoreCase("months")) {
+ timeUnit = "MONTH";
+ } else if (timeUnit.equalsIgnoreCase("days")) {
+ timeUnit = "DAY";
+ } else if (timeUnit.equalsIgnoreCase("hours")) {
+ // When job details are persisted to database, frequency in hours are converted to minutes.
+ // This conversion is to conform with that.
+ frequency = Integer.parseInt(frequency) * 60 + "";
+ timeUnit = "MINUTE";
+ } else if (timeUnit.equalsIgnoreCase("minutes")) {
+ timeUnit = "MINUTE";
+ }
+ }
+ // Adding the frequency and time unit filters to the filter map
+ List<String> list = new ArrayList<String>();
+ list.add(timeUnit);
+ map.put("unit", list);
+ list = new ArrayList<String>();
+ list.add(frequency);
+ map.put("frequency", list);
+ }
+ }
+ return map;
+ }
+
+ public List<WorkflowJobBean> getReruns(String coordActionId) throws CoordinatorEngineException {
+ List<WorkflowJobBean> wfBeans;
+ try {
+ wfBeans = WorkflowJobQueryExecutor.getInstance().getList(WorkflowJobQuery.GET_WORKFLOWS_PARENT_COORD_RERUN,
+ coordActionId);
+ }
+ catch (JPAExecutorException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ return wfBeans;
+ }
+
+ /**
+ * Update coord job definition.
+ *
+ * @param conf the conf
+ * @param jobId the job id
+ * @param dryrun the dryrun
+ * @param showDiff the show diff
+ * @return the string
+ * @throws CoordinatorEngineException the coordinator engine exception
+ */
+ public String updateJob(Configuration conf, String jobId, boolean dryrun, boolean showDiff)
+ throws CoordinatorEngineException {
+ try {
+ CoordUpdateXCommand update = new CoordUpdateXCommand(dryrun, conf, jobId, showDiff);
+ return update.call();
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
+
+ /**
+ * Return the status for a Job ID
+ *
+ * @param jobId job Id.
+ * @return the job's status
+ * @throws CoordinatorEngineException thrown if the job's status could not be obtained
+ */
+ @Override
+ public String getJobStatus(String jobId) throws CoordinatorEngineException {
+ try {
+ CoordinatorJobBean coordJob = CoordJobQueryExecutor.getInstance().get(
+ CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_STATUS, jobId);
+ return coordJob.getStatusStr();
+ }
+ catch (JPAExecutorException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ /**
+ * Return the status for an Action ID
+ *
+ * @param actionId action Id.
+ * @return the action's status
+ * @throws CoordinatorEngineException thrown if the action's status could not be obtained
+ */
+ public String getActionStatus(String actionId) throws CoordinatorEngineException {
+ try {
+ CoordinatorActionBean coordAction = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
+ return coordAction.getStatusStr();
+ }
+ catch (JPAExecutorException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ @Override
+ public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ try {
+ new CoordSLAAlertsDisableXCommand(id, actions, dates).call();
+
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ @Override
+ public void changeSLA(String id, String actions, String dates, String childIds, String newParams)
+ throws BaseEngineException {
+ Map<String, String> slaNewParams = null;
+
+ try {
+
+ if (newParams != null) {
+ slaNewParams = JobUtils.parseChangeValue(newParams);
+ }
+
+ new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call();
+
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ @Override
+ public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException {
+ try {
+ new CoordSLAAlertsEnableXCommand(id, actions, dates).call();
+
+ }
+ catch (CommandException e) {
+ throw new CoordinatorEngineException(e);
+ }
+ }
+
+ /**
+ * return a list of killed Coordinator job
+ *
+ * @param filter, the filter string for which the coordinator jobs are killed
+ * @param start, the starting index for coordinator jobs
+ * @param length, maximum number of jobs to be killed
+ * @return the list of jobs being killed
+ * @throws CoordinatorEngineException thrown if one or more of the jobs cannot be killed
+ */
+ public CoordinatorJobInfo killJobs(String filter, int start, int length) throws CoordinatorEngineException {
+ try {
+ Map<String, List<String>> filterMap = parseJobsFilter(filter);
+ CoordinatorJobInfo coordinatorJobInfo =
+ new BulkCoordXCommand(filterMap, start, length, OperationType.Kill).call();
+ if (coordinatorJobInfo == null) {
+ return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
+ }
+ return coordinatorJobInfo;
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
+
+ /**
+ * return the jobs that've been suspended
+ * @param filter Filter for jobs that will be suspended, can be name, user, group, status, id or combination of any
+ * @param start Offset for the jobs that will be suspended
+ * @param length maximum number of jobs that will be suspended
+ * @return
+ * @throws CoordinatorEngineException
+ */
+ public CoordinatorJobInfo suspendJobs(String filter, int start, int length) throws CoordinatorEngineException {
+ try {
+ Map<String, List<String>> filterMap = parseJobsFilter(filter);
+ CoordinatorJobInfo coordinatorJobInfo =
+ new BulkCoordXCommand(filterMap, start, length, OperationType.Suspend).call();
+ if (coordinatorJobInfo == null) {
+ return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
+ }
+ return coordinatorJobInfo;
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
+
+ /**
+ * return the jobs that've been resumed
+ * @param filter Filter for jobs that will be resumed, can be name, user, group, status, id or combination of any
+ * @param start Offset for the jobs that will be resumed
+ * @param length maximum number of jobs that will be resumed
+ * @return
+ * @throws CoordinatorEngineException
+ */
+ public CoordinatorJobInfo resumeJobs(String filter, int start, int length) throws CoordinatorEngineException {
+ try {
+ Map<String, List<String>> filterMap = parseJobsFilter(filter);
+ CoordinatorJobInfo coordinatorJobInfo =
+ new BulkCoordXCommand(filterMap, start, length, OperationType.Resume).call();
+ if (coordinatorJobInfo == null) {
+ return new CoordinatorJobInfo(new ArrayList<CoordinatorJobBean>(), 0, 0, 0);
+ }
+ return coordinatorJobInfo;
+ }
+ catch (CommandException ex) {
+ throw new CoordinatorEngineException(ex);
+ }
+ }
+ /**
+ * Get coord action missing dependencies
+ * @param id jobID
+ * @param actions action list
+ * @param dates nominal time list
+ * @return pair of coord action bean and list of missing input dependencies.
+ * @throws CommandException
+ */
+ public List<Pair<CoordinatorActionBean, Map<String, ActionDependency>>> getCoordActionMissingDependencies(String id,
+ String actions, String dates) throws CommandException {
+ return new CoordActionMissingDependenciesXCommand(id, actions, dates).call();
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java
new file mode 100644
index 0000000..8a40e20
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/CoordinatorWfActionBean.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie;
+
+import org.apache.oozie.client.CoordinatorWfAction;
+import org.apache.oozie.client.rest.JsonBean;
+import org.apache.oozie.client.rest.JsonTags;
+import org.json.simple.JSONObject;
+
+public class CoordinatorWfActionBean implements CoordinatorWfAction, JsonBean{
+
+ private int actionNumber;
+
+ private WorkflowActionBean action;
+
+ private String strNullReason;
+
+ public CoordinatorWfActionBean(int actionNumber) {
+ this(actionNumber, null, null);
+ }
+
+ public CoordinatorWfActionBean(int actionNumber, WorkflowActionBean action, String nullReason) {
+ this.actionNumber = actionNumber;
+ this.action = action;
+ this.strNullReason = nullReason;
+ }
+
+ public int getActionNumber() {
+ return actionNumber;
+ }
+
+ public WorkflowActionBean getAction() {
+ return action;
+ }
+
+ public String getNullReason() {
+ return strNullReason;
+ }
+
+ public void setActionNumber(int actionNumber) {
+ this.actionNumber = actionNumber;
+ }
+
+ public void setAction(WorkflowActionBean action) {
+ this.action = action;
+ }
+
+ public void setNullReason(String nullReason) {
+ this.strNullReason = nullReason;
+ }
+
+ @Override
+ public JSONObject toJSONObject() {
+ return toJSONObject("GMT");
+ }
+
+ @Override
+ public JSONObject toJSONObject(String timeZoneId) {
+ JSONObject json = new JSONObject();
+ json.put(JsonTags.COORDINATOR_WF_ACTION_NUMBER, actionNumber);
+ json.put(JsonTags.COORDINATOR_WF_ACTION_NULL_REASON, strNullReason);
+ if (action != null) {
+ json.put(JsonTags.COORDINATOR_WF_ACTION, action.toJSONObject(timeZoneId));
+ }
+ else {
+ json.put(JsonTags.COORDINATOR_WF_ACTION, action);
+ }
+ return json;
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index b03ad06..662e1ed 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -105,6 +105,7 @@ public enum ErrorCode {
E0609(XLog.OPS, "Missing [{0}] ORM file [{1}]"),
E0610(XLog.OPS, "Missing JPAService, StoreService cannot run without a JPAService"),
E0611(XLog.OPS, "SQL error in operation [{0}], {1}"),
+ E0612(XLog.OPS, "Could not get coordinator actions"),
E0700(XLog.STD, "XML error, {0}"),
E0701(XLog.STD, "XML schema error, {0}"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java
new file mode 100644
index 0000000..8536ed9
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordWfActionInfoXCommand.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.coord;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorWfActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.client.CoordinatorWfAction;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.ParamChecker;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CoordWfActionInfoXCommand extends CoordinatorXCommand<List<CoordinatorWfActionBean>>{
+ /**
+ * This class gets the wf action info in coordinator by action name and coordinator job ID.
+ */
+ private static final String ACTION_INFO = "action.info";
+ private static final int DEFAULT_OFFSET = 1;
+ private static final int DEFAULT_LEN = 50;
+
+ private final String jobId;
+ private final String actionName;
+ private final int offset;
+ private final int len;
+ private List<CoordinatorActionBean> coordActions;
+ private JPAService jpaService = null;
+
+ public CoordWfActionInfoXCommand(String jobId, String actionName) {
+ this(jobId, actionName, DEFAULT_OFFSET, DEFAULT_LEN);
+ }
+
+ public CoordWfActionInfoXCommand(String jobId, String actionName, int offset, int len) {
+ super(ACTION_INFO, ACTION_INFO, 1);
+
+ this.jobId = ParamChecker.notEmpty(jobId, "jobId");
+ this.actionName = ParamChecker.notEmpty(actionName, "actionName");
+ this.offset = offset;
+ this.len = len;
+ }
+
+ @Override
+ protected List<CoordinatorWfActionBean> execute() throws CommandException {
+ List<CoordinatorWfActionBean> coordWfActions = new ArrayList<CoordinatorWfActionBean>();
+ for(CoordinatorActionBean coordAction : coordActions) {
+ String wfId = coordAction.getExternalId();
+ String nullReason = null;
+ WorkflowActionBean wfAction = null;
+ if (wfId != null) {
+ String wfActionId = wfId + "@" + actionName;
+ try {
+ wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(wfActionId, true));
+ if (wfAction == null) {
+ nullReason = CoordinatorWfAction.NullReason.ACTION_NULL.getNullReason(actionName, wfId);
+ }
+ } catch (JPAExecutorException ex) {
+ throw new CommandException(ex);
+ }
+ } else {
+ nullReason = CoordinatorWfAction.NullReason.PARENT_NULL.getNullReason();
+ LOG.warn(nullReason);
+ wfAction = null;
+ }
+ int actionNumber = coordAction.getActionNumber();
+ CoordinatorWfActionBean coordWfAction = new CoordinatorWfActionBean(actionNumber, wfAction, nullReason);
+
+ coordWfActions.add(coordWfAction);
+ }
+ return coordWfActions;
+ }
+
+ /**
+ * (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#loadState()
+ **/
+ @Override
+ protected void loadState() throws CommandException {
+ jpaService = Services.get().get(JPAService.class);
+ if (jpaService != null) {
+ try {
+ coordActions = jpaService.execute(
+ new CoordJobGetActionsSubsetJPAExecutor(jobId, null, offset, len, false));
+ } catch (JPAExecutorException ex) {
+ LOG.error(ErrorCode.E0612);
+ throw new CommandException(ex);
+ }
+ }
+ else {
+ throw new CommandException(ErrorCode.E0610);
+ }
+ }
+
+ /**
+ * (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+ * no-op
+ **/
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+ /**
+ * (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#isLockRequired()
+ **/
+ @Override
+ protected boolean isLockRequired() {
+ return false;
+ }
+
+ /**
+ * (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#getEntityKey()
+ **/
+ @Override
+ public String getEntityKey() {
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
index 0b7f50d..f8ac11e 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
@@ -33,11 +33,19 @@ import org.apache.oozie.util.XLog;
*/
public class WorkflowActionGetJPAExecutor implements JPAExecutor<WorkflowActionBean> {
+ public XLog LOG = XLog.getLog(getClass());
+
private String wfActionId = null;
+ private final boolean isNullAcceptable;
public WorkflowActionGetJPAExecutor(String wfActionId) {
+ this(wfActionId, false);
+ }
+
+ public WorkflowActionGetJPAExecutor(String wfActionId, boolean isNullAcceptable) {
ParamChecker.notNull(wfActionId, "wfActionId");
this.wfActionId = wfActionId;
+ this.isNullAcceptable = isNullAcceptable;
}
/* (non-Javadoc)
@@ -69,7 +77,13 @@ public class WorkflowActionGetJPAExecutor implements JPAExecutor<WorkflowActionB
return bean;
}
else {
- throw new JPAExecutorException(ErrorCode.E0605, wfActionId);
+ if (isNullAcceptable) {
+ LOG.warn("Could not get workflow action {0}", wfActionId);
+ return null;
+ }
+ else {
+ throw new JPAExecutorException(ErrorCode.E0605, wfActionId);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
index 03acbc1..a9ea615 100644
--- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java
@@ -48,6 +48,8 @@ public abstract class BaseJobServlet extends JsonRestServlet {
private static final ResourceInfo RESOURCES_INFO[] = new ResourceInfo[1];
+ final static String NOT_SUPPORTED_MESSAGE = "Not supported in this version";
+
static {
RESOURCES_INFO[0] = new ResourceInfo("*", Arrays.asList("PUT", "GET"), Arrays.asList(new ParameterInfo(
RestConstants.ACTION_PARAM, String.class, true, Arrays.asList("PUT")), new ParameterInfo(
@@ -368,6 +370,12 @@ public abstract class BaseJobServlet extends JsonRestServlet {
startCron();
sendJsonResponse(response, HttpServletResponse.SC_OK, json);
}
+ else if (show.equals(RestConstants.JOB_SHOW_WF_ACTIONS_IN_COORD)) {
+ stopCron();
+ JSONObject json = getWfActionByJobIdAndName(request, response);
+ startCron();
+ sendJsonResponse(response, HttpServletResponse.SC_OK, json);
+ }
else {
throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303,
RestConstants.JOB_SHOW_PARAM, show);
@@ -607,4 +615,18 @@ public abstract class BaseJobServlet extends JsonRestServlet {
*/
abstract JSONObject getCoordActionMissingDependencies(HttpServletRequest request, HttpServletResponse response)
throws XServletException, IOException;
+
+ /**
+ * get wf actions by name in coordinator job
+ *
+ * @param request the request
+ * @param response the response
+ * @return the JSON object
+ * @throws XServletException the x servlet exception
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ protected JSONObject getWfActionByJobIdAndName(HttpServletRequest request, HttpServletResponse response)
+ throws XServletException, IOException {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, NOT_SUPPORTED_MESSAGE);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
index 6c30f5d..c2b90c1 100644
--- a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java
@@ -35,6 +35,8 @@ import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorActionInfo;
import org.apache.oozie.CoordinatorEngine;
import org.apache.oozie.CoordinatorEngineException;
+import org.apache.oozie.CoordinatorWfActionBean;
+import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.DagEngine;
import org.apache.oozie.DagEngineException;
import org.apache.oozie.ErrorCode;
@@ -50,6 +52,7 @@ import org.apache.oozie.service.BundleEngineService;
import org.apache.oozie.service.CoordinatorEngineService;
import org.apache.oozie.service.DagEngineService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.util.Pair;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
@@ -381,4 +384,46 @@ public class V2JobServlet extends V1JobServlet {
}
}
+ @Override
+ protected JSONObject getWfActionByJobIdAndName(HttpServletRequest request, HttpServletResponse response)
+ throws XServletException, IOException {
+ CoordinatorEngine coordEngine = Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(
+ getUser(request));
+ String jobId = getResourceName(request);
+ String action = request.getParameter(RestConstants.ACTION_NAME_PARAM);
+ String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
+ String lenStr = request.getParameter(RestConstants.LEN_PARAM);
+ String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM);
+ timeZoneId = (timeZoneId == null) ? "GMT" : timeZoneId;
+
+ if (action == null) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST,
+ ErrorCode.E0305, RestConstants.ACTION_NAME_PARAM);
+ }
+
+ int offset = (startStr != null) ? Integer.parseInt(startStr) : 1;
+ offset = (offset < 1) ? 1 : offset;
+ /**
+ * set default number of wf actions to be retrieved to
+ * default number of coordinator actions to be retrieved
+ **/
+ int defaultLen = ConfigurationService.getInt(COORD_ACTIONS_DEFAULT_LENGTH);
+ int len = (lenStr != null) ? Integer.parseInt(lenStr) : 0;
+ len = getCoordinatorJobLength(defaultLen, len);
+
+ try {
+ JSONObject json = new JSONObject();
+ List<CoordinatorWfActionBean> coordWfActions = coordEngine.getWfActionByJobIdAndName(jobId, action, offset, len);
+ JSONArray array = new JSONArray();
+ for (CoordinatorWfActionBean coordWfAction : coordWfActions) {
+ array.add(coordWfAction.toJSONObject(timeZoneId));
+ }
+ json.put(JsonTags.COORDINATOR_JOB_ID, jobId);
+ json.put(JsonTags.COORDINATOR_WF_ACTIONS, array);
+ return json;
+ }
+ catch (CoordinatorEngineException ex) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
index 564db2a..dbc160f 100644
--- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
+++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
@@ -542,7 +542,7 @@ public class TestOozieCLI extends DagServletTestCase {
MockCoordinatorEngineService.JOB_ID + "1", "-action", "1" };
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_ACTION_KILL, MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -566,7 +566,7 @@ public class TestOozieCLI extends DagServletTestCase {
MockCoordinatorEngineService.JOB_ID + "1", "-date", "2009-12-15T01:00Z::2009-12-16T01:00Z" };
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_ACTION_KILL, MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -608,7 +608,7 @@ public class TestOozieCLI extends DagServletTestCase {
"-action", "1" };
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did);
- assertTrue(MockCoordinatorEngineService.started.get(1));
+ assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -632,7 +632,7 @@ public class TestOozieCLI extends DagServletTestCase {
"-date", "2009-12-15T01:00Z::2009-12-16T01:00Z" };
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did);
- assertTrue(MockCoordinatorEngineService.started.get(1));
+ assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -656,7 +656,7 @@ public class TestOozieCLI extends DagServletTestCase {
"-action", "0", "-refresh" };
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did);
- assertTrue(MockCoordinatorEngineService.started.get(0));
+ assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(0));
return null;
}
});
@@ -679,7 +679,7 @@ public class TestOozieCLI extends DagServletTestCase {
"-action", "0", "-nocleanup" };
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_COORD_ACTION_RERUN, MockCoordinatorEngineService.did);
- assertTrue(MockCoordinatorEngineService.started.get(0));
+ assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(0));
return null;
}
});
@@ -703,7 +703,7 @@ public class TestOozieCLI extends DagServletTestCase {
"-date", "2009-12-15T01:00Z", "-action", "1" };
assertEquals(-1, new OozieCLI().run(args));
assertNull(MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -726,7 +726,7 @@ public class TestOozieCLI extends DagServletTestCase {
MockCoordinatorEngineService.JOB_ID + "1" + MockDagEngineService.JOB_ID_END};
assertEquals(-1, new OozieCLI().run(args));
assertNull(MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -751,7 +751,7 @@ public class TestOozieCLI extends DagServletTestCase {
"-rerun", MockCoordinatorEngineService.JOB_ID + "0" };
assertEquals(-1, new OozieCLI().run(args));
assertNull(MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -776,7 +776,7 @@ public class TestOozieCLI extends DagServletTestCase {
assertEquals(-1, new OozieCLI().run(args));
assertNull(MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -790,7 +790,7 @@ public class TestOozieCLI extends DagServletTestCase {
String[] args = new String[]{"job", "-oozie", oozieUrl, "-ignore", MockCoordinatorEngineService.JOB_ID + "1"};
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_ACTION_CHANGE, MockCoordinatorEngineService.did);
- assertTrue(MockCoordinatorEngineService.started.get(1));
+ assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1));
// negative test for "oozie job -ignore <non-existent coord>"
MockCoordinatorEngineService.reset();
@@ -799,7 +799,7 @@ public class TestOozieCLI extends DagServletTestCase {
MockDagEngineService.JOB_ID + (MockCoordinatorEngineService.coordJobs.size() + 1)};
assertEquals(-1, new OozieCLI().run(args));
assertNull(MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -814,7 +814,7 @@ public class TestOozieCLI extends DagServletTestCase {
MockCoordinatorEngineService.JOB_ID + "1", "-action", "1"};
assertEquals(0, new OozieCLI().run(args));
assertEquals(RestConstants.JOB_ACTION_IGNORE, MockCoordinatorEngineService.did);
- assertTrue(MockCoordinatorEngineService.started.get(1));
+ assertTrue(MockCoordinatorEngineService.startedCoordJobs.get(1));
// negative test for "oozie job -ignore <non-existent coord> -action 1"
MockCoordinatorEngineService.reset();
@@ -822,7 +822,7 @@ public class TestOozieCLI extends DagServletTestCase {
MockDagEngineService.JOB_ID + (MockCoordinatorEngineService.coordJobs.size() + 1), "-action", "1" };
assertEquals(-1, new OozieCLI().run(args));
assertNull(MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
// negative test for "oozie job -ignore <id> -action (action is empty)"
MockCoordinatorEngineService.reset();
@@ -830,7 +830,7 @@ public class TestOozieCLI extends DagServletTestCase {
MockCoordinatorEngineService.JOB_ID, "-action", ""};
assertEquals(-1, new OozieCLI().run(args));
assertNull(MockCoordinatorEngineService.did);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
@@ -1417,7 +1417,7 @@ public class TestOozieCLI extends DagServletTestCase {
"-oozie", oozieUrl, "-Doozie.proxysubmission=true" };
assertEquals(0, new OozieCLI().run(args));
assertEquals(MockCoordinatorEngineService.did, RestConstants.JOB_ACTION_DRYRUN);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -1432,7 +1432,7 @@ public class TestOozieCLI extends DagServletTestCase {
String[] args = new String[] { "job", "-update", "aaa", "-oozie", oozieUrl };
assertEquals(-1, new OozieCLI().run(args));
assertEquals(MockCoordinatorEngineService.did, RestConstants.JOB_COORD_UPDATE );
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -1450,7 +1450,7 @@ public class TestOozieCLI extends DagServletTestCase {
assertEquals(-1, new OozieCLI().run(args));
assertEquals(MockCoordinatorEngineService.did, RestConstants.JOB_COORD_UPDATE + "&"
+ RestConstants.JOB_ACTION_DRYRUN);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
@@ -1691,7 +1691,7 @@ public class TestOozieCLI extends DagServletTestCase {
String[] args = new String[] { "job", "-missingdeps", "aaa-C", "-oozie", oozieUrl };
assertEquals(0, new OozieCLI().run(args));
assertEquals(MockCoordinatorEngineService.did, RestConstants.COORD_ACTION_MISSING_DEPENDENCIES);
- assertFalse(MockCoordinatorEngineService.started.get(1));
+ assertFalse(MockCoordinatorEngineService.startedCoordJobs.get(1));
return null;
}
});
http://git-wip-us.apache.org/repos/asf/oozie/blob/40618313/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java
new file mode 100644
index 0000000..ceaa707
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordWfActionInfoXCommand.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.command.coord;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.CoordinatorWfActionBean;
+import org.apache.oozie.client.CoordinatorWfAction;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+public class TestCoordWfActionInfoXCommand extends XDataTestCase {
+ Services services;
+
+ private CoordinatorJobBean coordJob;
+ private List<WorkflowJobBean> wfJobs;
+ private List<CoordinatorActionBean> coordActions;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+
+ createTestData();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+ /**
+ * init the test case.
+ * 1 coordJob
+ * 5 coordAction created by the coordJob, while the 5th coordAction's workflow instance is null
+ * 4 wfJob match the 1st ~ 4th coordAction
+ * the 1st - 3rd wfAction has a wfAction named 'aa' each, but the 4th desn't.
+ */
+ private void createTestData() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull("Missing jpa service", jpaService);
+
+ coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+ wfJobs = new ArrayList<WorkflowJobBean>();
+ coordActions = new ArrayList<CoordinatorActionBean>();
+
+ for(int i = 0; i < 4; i++) {
+ WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED);
+ wfJobs.add(wfJob);
+ }
+ for(int i = 0; i < 4; i++) {
+ CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), (i+1),
+ CoordinatorAction.Status.SUCCEEDED,"coord-action-get.xml", wfJobs.get(i).getId(), "SUCCEEDED", 0);
+ coordActions.add(coordAction);
+ }
+
+ //add a coordAction that doesnt create workflow instance yet
+ CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 5,
+ CoordinatorAction.Status.SUCCEEDED,"coord-action-get.xml", null, null, 0);
+ coordActions.add(coordAction);
+
+ //set the NominalTime,in order to keep the order of the coordAction.
+ for(int i = 0; i < 5; i++) {
+ setCoordActionNominalTime(coordActions.get(i).getId(), (i+1) * 1000);
+ }
+
+ //create the case that the 4th wfJob doesnt have a action named "aa"
+ for(int i = 0; i < 4; i++) {
+ String name = (i == 3) ? "bb" : "aa";
+ addRecordToWfActionTable(wfJobs.get(i).getId(), name, WorkflowAction.Status.DONE);
+ }
+ }
+
+ public void testNormalCase() throws Exception {
+ int offset = 2, len = 2;
+ List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", offset, len).call();
+ assertEquals(2, coordWfActions.size());
+ List<String> wfIds = Arrays.asList(wfJobs.get(1).getId(), wfJobs.get(2).getId());
+
+ for(int i = 0; i < coordWfActions.size(); i++) {
+ CoordinatorWfActionBean coordWfAction = coordWfActions.get(i);
+ WorkflowActionBean wfAction = coordWfAction.getAction();
+
+ assertEquals(i + offset, coordWfActions.get(i).getActionNumber());
+ assertEquals(wfIds.get(i), wfAction.getWfId());
+ assertEquals(null, coordWfAction.getNullReason());
+ }
+ }
+
+ public void testActionMissing() throws CommandException{
+ List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 2, 3).call();
+ assertEquals(3, coordWfActions.size());
+
+ assertEquals(wfJobs.get(1).getId(), coordWfActions.get(0).getAction().getWfId());
+ assertEquals(wfJobs.get(2).getId(), coordWfActions.get(1).getAction().getWfId());
+ CoordinatorWfActionBean coordWfAction = coordWfActions.get(2);
+ assertEquals(4, coordWfAction.getActionNumber());
+ assertEquals(null, coordWfAction.getAction());
+ String expectNullReason = CoordinatorWfAction.NullReason.ACTION_NULL.getNullReason("aa", wfJobs.get(3).getId());
+ assertEquals(expectNullReason, coordWfAction.getNullReason());
+ }
+
+ public void testWorkflowInstanceMissing() throws CommandException {
+ List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 2, 4).call();
+ assertEquals(4, coordWfActions.size());
+
+ CoordinatorWfActionBean coordWfAction = coordWfActions.get(3);
+ assertEquals(5, coordWfAction.getActionNumber());
+ assertEquals(null, coordWfAction.getAction());
+ String expectNullReason = CoordinatorWfAction.NullReason.PARENT_NULL.getNullReason();
+ assertEquals(expectNullReason, coordWfAction.getNullReason());
+ }
+
+ //test offset out of Range
+ public void testOffsetOutOfRange() throws CommandException {
+ List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 6, 4).call();
+ assertEquals(0, coordWfActions.size());
+ }
+
+ //test len out of Range
+ public void testLenOutOfRange() throws CommandException {
+ int offset = 2;
+ List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa", 2, 19).call();
+ assertEquals(4, coordWfActions.size());
+
+ for(int i = 0; i < coordWfActions.size(); i++) {
+ assertEquals(i + offset, coordWfActions.get(i).getActionNumber());
+ }
+ }
+
+ //test default offset and len
+ private void _testDefaultOffsetAndLen() throws CommandException {
+ List<CoordinatorWfActionBean> coordWfActions = new CoordWfActionInfoXCommand(coordJob.getId(), "aa").call();
+ assertEquals(5, coordWfActions.size());
+
+ for(int i = 0; i < coordWfActions.size(); i++) {
+ assertEquals(i + 1, coordWfActions.get(i).getActionNumber());
+ }
+ }
+}