You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/22 23:32:59 UTC
svn commit: r1376264 - in /incubator/oozie/trunk: ./
client/src/main/java/org/apache/oozie/cli/
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/rest/
core/src/main/java/org/apache/oozie/ core/src/main/java/org...
Author: virag
Date: Wed Aug 22 21:32:58 2012
New Revision: 1376264
URL: http://svn.apache.org/viewvc?rev=1376264&view=rev
Log:
OOZIE-848 Bulk Monitoring API - Consolidated view of jobs (mona via virag)
Added:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
Modified:
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
incubator/oozie/trunk/release-log.txt
Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Wed Aug 22 21:32:58 2012
@@ -50,6 +50,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.oozie.BuildInfo;
import org.apache.oozie.client.AuthOozieClient;
+import org.apache.oozie.client.BulkResponse;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
@@ -133,6 +134,8 @@ public class OozieCLI {
public static final String INFO_TIME_ZONES_OPTION = "timezones";
+ public static final String BULK_OPTION = "bulk";
+
private static final String[] OOZIE_HELP = {
"the env variable '" + ENV_OOZIE_URL + "' is used as default value for the '-" + OOZIE_OPTION + "' option",
"the env variable '" + ENV_OOZIE_TIME_ZONE + "' is used as default value for the '-" + TIME_ZONE_OPTION + "' option",
@@ -327,6 +330,10 @@ public class OozieCLI {
"use time zone with the specified ID (default GMT).\nSee 'oozie info -timezones' for a list");
Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode");
Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates as the specified user");
+ Option bulkMonitor = new Option(BULK_OPTION, true, "key-value pairs to filter bulk jobs response. e.g. bundle=<B>\\;" +
+ "coordinators=<C>\\;actionstatus=<S>\\;startcreatedtime=<SC>\\;endcreatedtime=<EC>\\;" +
+ "startscheduledtime=<SS>\\;endscheduledtime=<ES>\\; coordinators and actionstatus can be multiple comma separated values" +
+ "bundle and coordinators are 'names' of those jobs. Bundle name is mandatory, other params are optional");
start.setType(Integer.class);
len.setType(Integer.class);
Options jobsOptions = new Options();
@@ -340,6 +347,7 @@ public class OozieCLI {
jobsOptions.addOption(filter);
jobsOptions.addOption(jobtype);
jobsOptions.addOption(verbose);
+ jobsOptions.addOption(bulkMonitor);
addAuthOptions(jobsOptions);
return jobsOptions;
}
@@ -1088,6 +1096,7 @@ public class OozieCLI {
private static final String WORKFLOW_ACTION_FORMATTER = "%-78s%-10s%-23s%-11s%-10s";
private static final String COORD_ACTION_FORMATTER = "%-43s%-10s%-37s%-10s%-21s%-21s";
+ private static final String BULK_RESPONSE_FORMATTER = "%-41s%-41s%-37s%-37s%-13s%-21s%-24s";
private void printJob(WorkflowJob job, String timeZoneId, boolean verbose) throws IOException {
System.out.println("Job ID : " + maskIfNull(job.getId()));
@@ -1170,8 +1179,13 @@ public class OozieCLI {
String timeZoneId = getTimeZoneId(commandLine);
jobtype = (jobtype != null) ? jobtype : "wf";
int len = Integer.parseInt((s != null) ? s : "0");
+ String bulkFilterString = commandLine.getOptionValue(BULK_OPTION);
+
try {
- if (jobtype.toLowerCase().contains("wf")) {
+ if (bulkFilterString != null) {
+ printBulkJobs(wc.getBulkInfo(bulkFilterString, start, len), timeZoneId);
+ }
+ else if (jobtype.toLowerCase().contains("wf")) {
printJobs(wc.getJobsInfo(filter, start, len), timeZoneId, commandLine.hasOption(VERBOSE_OPTION));
}
else if (jobtype.toLowerCase().startsWith("coord")) {
@@ -1233,6 +1247,24 @@ public class OozieCLI {
}
}
+ private void printBulkJobs(List<BulkResponse> jobs, String timeZoneId) throws IOException {
+ if (jobs != null && jobs.size() > 0) {
+ System.out.println(String.format(BULK_RESPONSE_FORMATTER, "Bundle Name", "Coordinator Name",
+ "Coord Action ID", "External ID", "Status", "Created Time", "Error Message"));
+
+ for (BulkResponse response : jobs) {
+ System.out.println(String.format(BULK_RESPONSE_FORMATTER, maskIfNull((response.getBundle()).getAppName()),
+ maskIfNull((response.getCoordinator()).getAppName()), maskIfNull((response.getAction()).getId()),
+ maskIfNull((response.getAction()).getExternalId()), (response.getAction()).getStatus(),
+ maskDate((response.getAction()).getCreatedTime(), timeZoneId, false), (response.getAction()).getErrorMessage()));
+ System.out.println(RULER);
+ }
+ }
+ else {
+ System.out.println("Bulk request criteria did not match any coordinator actions");
+ }
+ }
+
private void printBundleJobs(List<BundleJob> jobs, String timeZoneId, boolean verbose) throws IOException {
if (jobs != null && jobs.size() > 0) {
if (verbose) {
Added: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java (added)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client;
+
+/**
+ * Bean that represents an Oozie application instance.
+ */
+
+public interface BulkResponse {
+
+ /**
+ * Return the coordinator comprising this bulk object
+ *
+ * @return the coordinator.
+ */
+ CoordinatorJob getCoordinator();
+
+ /**
+ * Return the bundle comprising this bulk object
+ *
+ * @return the bundle job.
+ */
+ BundleJob getBundle();
+
+ /**
+ * Return the coordinator action comprising this bulk object
+ *
+ * @return the coordinator action.
+ */
+ CoordinatorAction getAction();
+}
Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed Aug 22 21:32:58 2012
@@ -1062,6 +1062,32 @@ public class OozieClient {
}
}
+ private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
+
+ BulkResponseStatus(String filter, int start, int len) {
+ super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
+ RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
+ }
+
+ @Override
+ protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
+ conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
+ if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+ Reader reader = new InputStreamReader(conn.getInputStream());
+ JSONObject json = (JSONObject) JSONValue.parse(reader);
+ JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
+ if (results == null) {
+ results = new JSONArray();
+ }
+ return JsonToBean.createBulkResponseList(results);
+ }
+ else {
+ handleError(conn);
+ }
+ return null;
+ }
+ }
+
private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
@@ -1364,6 +1390,10 @@ public class OozieClient {
return new BundleJobsStatus(filter, start, len).call();
}
+ public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
+ return new BulkResponseStatus(filter, start, len).call();
+ }
+
private class GetQueueDump extends ClientCallable<List<String>> {
GetQueueDump() {
super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());
Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Wed Aug 22 21:32:58 2012
@@ -182,7 +182,15 @@ public interface JsonTags {
public static final Object BUNDLE_JOB_TOTAL = "total";
public static final Object BUNDLE_JOB_OFFSET = "offset";
public static final Object BUNDLE_JOB_LEN = "len";
-
+
+ public static final String BULK_RESPONSE_BUNDLE = "bulkbundle";
+ public static final String BULK_RESPONSE_COORDINATOR = "bulkcoord";
+ public static final String BULK_RESPONSE_ACTION = "bulkaction";
+ public static final Object BULK_RESPONSES = "bulkresponses";
+ public static final Object BULK_RESPONSE_TOTAL = "total";
+ public static final Object BULK_RESPONSE_OFFSET = "offset";
+ public static final Object BULK_RESPONSE_LEN = "len";
+
public static final String AVAILABLE_TIME_ZONES = "available-timezones";
public static final String TIME_ZOME_DISPLAY_NAME = "timezoneDisplayName";
public static final String TIME_ZONE_ID = "timezoneId";
Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Wed Aug 22 21:32:58 2012
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.client.rest;
+import org.apache.oozie.client.BulkResponse;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
@@ -63,6 +64,7 @@ public class JsonToBean {
private static final Map<String, Property> COORD_JOB = new HashMap<String, Property>();
private static final Map<String, Property> COORD_ACTION = new HashMap<String, Property>();
private static final Map<String, Property> BUNDLE_JOB = new HashMap<String, Property>();
+ private static final Map<String, Property> BULK_RESPONSE = new HashMap<String, Property>();
static {
WF_ACTION.put("getId", new Property(JsonTags.WORKFLOW_ACTION_ID, String.class));
@@ -169,6 +171,11 @@ public class JsonToBean {
BUNDLE_JOB.put("getConsoleUrl",new Property(JsonTags.BUNDLE_JOB_CONSOLE_URL, String.class));
BUNDLE_JOB.put("getCoordinators",new Property(JsonTags.BUNDLE_COORDINATOR_JOBS, CoordinatorJob.class, true));
BUNDLE_JOB.put("toString", new Property(JsonTags.TO_STRING, String.class));
+
+ BULK_RESPONSE.put("getBundle", new Property(JsonTags.BULK_RESPONSE_BUNDLE, BundleJob.class, true));
+ BULK_RESPONSE.put("getCoordinator", new Property(JsonTags.BULK_RESPONSE_COORDINATOR, CoordinatorJob.class, true));
+ BULK_RESPONSE.put("getAction", new Property(JsonTags.BULK_RESPONSE_ACTION, CoordinatorAction.class, true));
+
}
/**
@@ -369,4 +376,21 @@ public class JsonToBean {
}
return list;
}
+
+ /**
+ * Creates a list of bulk response beans from a JSON array.
+ *
+ * @param json json array.
+ * @return a list of bulk response beans from a JSON array.
+ */
+ public static List<BulkResponse> createBulkResponseList(JSONArray json) {
+ List<BulkResponse> list = new ArrayList<BulkResponse>();
+ for (Object obj : json) {
+ BulkResponse bulkObj = (BulkResponse) Proxy.newProxyInstance
+ (JsonToBean.class.getClassLoader(), new Class[]{BulkResponse.class},
+ new JsonInvocationHandler(BULK_RESPONSE, (JSONObject) obj));
+ list.add(bulkObj);
+ }
+ return list;
+ }
}
Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java Wed Aug 22 21:32:58 2012
@@ -109,6 +109,8 @@ public interface RestConstants {
public static final String JOBS_FILTER_PARAM = "filter";
+ public static final String JOBS_BULK_PARAM = "bulk";
+
public static final String JOBS_EXTERNAL_ID_PARAM = "external-id";
public static final String ADMIN_STATUS_RESOURCE = "status";
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie;
+
+import java.util.List;
+
+import org.apache.oozie.client.rest.BulkResponseImpl;
+
+/**
+ * Class that stores the entire retrieved info from the bulk request command
+ * along with params for pagination of the results
+ * Query execution in BulkJPAExecutor returns an object of BulkResponseInfo class
+ */
+public class BulkResponseInfo {
+ private int start = 1;
+ private int len = 50;
+ private long total;
+ private List<BulkResponseImpl> responses;
+
+ /**
+ * Create a bulk response info bean.
+ *
+ * @param bundle job being returned.
+ * @param start bulk entries offset.
+ * @param len number of bulk entries.
+ * @param total total bulk entries.
+ */
+ public BulkResponseInfo(List<BulkResponseImpl> responses, int start, int len, long total) {
+ this.start = start;
+ this.len = len;
+ this.total = total;
+ this.responses = responses;
+ }
+
+ /**
+ * Return the coordinator actions being returned.
+ *
+ * @return the coordinator actions being returned.
+ */
+ public List<BulkResponseImpl> getResponses() {
+ return responses;
+ }
+
+ /**
+ * Return the offset of the bulk entries being returned.
+ * <p/>
+ * For pagination purposes.
+ *
+ * @return the offset of the bulk entries being returned.
+ */
+ public int getStart() {
+ return start;
+ }
+
+ /**
+ * Return the number of the bulk entries being returned.
+ * <p/>
+ * For pagination purposes.
+ *
+ * @return the number of the bulk entries being returned.
+ */
+ public int getLen() {
+ return len;
+ }
+
+ /**
+ * Return the total number of bulk entries.
+ * <p/>
+ * For pagination purposes.
+ *
+ * @return the total number of bulk entries.
+ */
+ public long getTotal() {
+ return total;
+ }
+}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java Wed Aug 22 21:32:58 2012
@@ -19,6 +19,7 @@ package org.apache.oozie;
import java.io.IOException;
import java.io.Writer;
+import java.text.ParseException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@@ -29,10 +30,13 @@ import java.util.Set;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.BulkResponseImpl;
+import org.apache.oozie.command.BulkJobsXCommand;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
@@ -46,6 +50,7 @@ import org.apache.oozie.command.bundle.B
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogService;
+import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XLogStreamer;
@@ -357,4 +362,99 @@ public class BundleEngine extends BaseEn
return map;
}
+ /**
+ * Get bulk job response
+ *
+ * @param filter the filter string
+ * @param start start location for paging
+ * @param len total length to get
+ * @return bulk job info
+ * @throws BundleEngineException thrown if failed to get bulk job info
+ */
+ public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
+ Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
+ try {
+ return new BulkJobsXCommand(bulkRequestMap, start, len).call();
+ }
+ catch (CommandException ex) {
+ throw new BundleEngineException(ex);
+ }
+ }
+
+ /**
+ * Parse filter string to a map with key = filter name and values = filter values
+ * Allowed keys are defined as constants on top
+ *
+ * @param filter the filter string
+ * @return filter key-value pair map
+ * @throws BundleEngineException thrown if failed to parse filter string
+ */
+ public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
+
+ Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
+ // Functionality can be extended to different job levels - TODO extend filter parser and query
+ // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
+ if (bulkFilter != null) {
+ StringTokenizer st = new StringTokenizer(bulkParams, ";");
+ while (st.hasMoreTokens()) {
+ String token = st.nextToken();
+ if (token.contains("=")) {
+ String[] pair = token.split("=");
+ if (pair.length != 2) {
+ throw new BundleEngineException(ErrorCode.E0420, token,
+ "elements must be name=value pairs");
+ }
+ pair[0] = pair[0].toLowerCase();
+ String[] values = pair[1].split(",");
+ if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
+ throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
+ pair[0]));
+ }
+ // special check and processing for time related params
+ if (pair[0].contains("time")) {
+ try {
+ DateUtils.parseDateUTC(pair[1]);
+ }
+ catch (ParseException e) {
+ throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
+ "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
+ DateUtils.ISO8601_UTC_MASK));
+ }
+ }
+ // special check for action status param
+ // TODO: when extended for levels other than coord action, check against corresponding level's Status values
+ if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
+ for(String value : values) {
+ try {
+ CoordinatorAction.Status.valueOf(value);
+ }
+ catch (IllegalArgumentException ex) {
+ throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
+ "invalid action status [{0}]", value));
+ }
+ }
+ }
+ // eventually adding into map for all cases e.g. names, times, status
+ List<String> list = bulkFilter.get(pair[0]);
+ if (list == null) {
+ list = new ArrayList<String>();
+ bulkFilter.put(pair[0], list);
+ }
+ for(String value : values) {
+ value = value.trim();
+ if(value.isEmpty()) {
+ throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
+ }
+ list.add(value);
+ }
+ } else {
+ throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
+ }
+ }
+ if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
+ throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
+ }
+ }
+ return bulkFilter;
+ }
}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.rest;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.BulkResponse;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+/**
+ * Server-side implementation class of the client interface BulkResponse
+ * Declares all the bulk request specific user parameters and handling as JSON object
+ */
+public class BulkResponseImpl implements BulkResponse, JsonBean {
+ private BundleJobBean bundle;
+ private CoordinatorJobBean coordinator;
+ private CoordinatorActionBean action;
+
+ public static final String BULK_FILTER_BUNDLE_NAME = "bundle";
+ public static final String BULK_FILTER_COORD_NAME = "coordinators";
+ public static final String BULK_FILTER_LEVEL = "filterlevel";
+ public static final String BULK_FILTER_STATUS = "actionstatus";
+ public static final String BULK_FILTER_START_CREATED_EPOCH = "startcreatedtime";
+ public static final String BULK_FILTER_END_CREATED_EPOCH = "endcreatedtime";
+ public static final String BULK_FILTER_START_NOMINAL_EPOCH = "startscheduledtime";
+ public static final String BULK_FILTER_END_NOMINAL_EPOCH = "endscheduledtime";
+ public static final String BULK_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:SS'Z'";
+
+ public static final Set<String> BULK_FILTER_NAMES = new HashSet<String>();
+
+ static {
+
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD_NAME);
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_LEVEL);
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_STATUS);
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
+ BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
+
+ }
+
+ /**
+ * Construct JSON object using the bulk request object and the associated tags
+ */
+ public JSONObject toJSONObject() {
+ return toJSONObject("GMT");
+ }
+
+ /**
+ * Construct JSON object using the bulk request object and the associated tags
+ */
+ @SuppressWarnings("unchecked")
+ public JSONObject toJSONObject(String timeZoneId) {
+ JSONObject json = new JSONObject();
+
+ json.put(JsonTags.BULK_RESPONSE_BUNDLE, bundle.toJSONObject());
+ json.put(JsonTags.BULK_RESPONSE_COORDINATOR, coordinator.toJSONObject());
+ json.put(JsonTags.BULK_RESPONSE_ACTION, action.toJSONObject());
+
+ json.put(JsonTags.BUNDLE_JOB_NAME, bundle.getAppName());
+ json.put(JsonTags.BUNDLE_JOB_ID, bundle.getId());
+ json.put(JsonTags.BUNDLE_JOB_STATUS, bundle.getStatus().toString());
+
+ json.put(JsonTags.COORDINATOR_JOB_NAME, coordinator.getAppName());
+ json.put(JsonTags.COORDINATOR_JOB_STATUS, coordinator.getStatus().toString());
+
+ json.put(JsonTags.COORDINATOR_ACTION_ID, action.getId());
+ json.put(JsonTags.COORDINATOR_JOB_ID, action.getJobId());
+ json.put(JsonTags.COORDINATOR_ACTION_NUMBER, action.getActionNumber());
+ json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, action.getExternalId());
+ json.put(JsonTags.COORDINATOR_ACTION_STATUS, action.getStatus().toString());
+ json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, action.getExternalStatus());
+ json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, action.getErrorCode());
+ json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, action.getErrorMessage());
+ json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, action.getCreatedTime().toString());
+ json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, action.getNominalTime().toString());
+ json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, action.getMissingDependencies());
+
+ return json;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.client.BulkResponse#getBundle()
+ */
+ @Override
+ public BundleJobBean getBundle() {
+ return bundle;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.client.BulkResponse#getCoordinator()
+ */
+ @Override
+ public CoordinatorJobBean getCoordinator() {
+ return coordinator;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.client.BulkResponse#getAction()
+ */
+ @Override
+ public CoordinatorActionBean getAction() {
+ return action;
+ }
+
+ /**
+ * Sets the bundle comprising this bulk response object
+ * @param BundleJobBean
+ */
+ public void setBundle(BundleJobBean bj) {
+ this.bundle = bj;
+ }
+
+ /**
+ * Sets the coordinator comprising this bulk response object
+ * @param CoordinatorJobBean
+ */
+ public void setCoordinator(CoordinatorJobBean cj) {
+ this.coordinator = cj;
+ }
+
+ /**
+ * Sets the coord action comprising this bulk response object
+ * @param CoordinatorActionBean
+ */
+ public void setAction(CoordinatorActionBean ca) {
+ this.action = ca;
+ }
+
+ /**
+ * Convert a nodes list into a JSONArray.
+ *
+ * @param actions nodes list.
+ * @param timeZoneId time zone to use for dates in the JSON array.
+ * @return the corresponding JSON array.
+ */
+ @SuppressWarnings("unchecked")
+ public static JSONArray toJSONArray(List<? extends BulkResponseImpl> responses, String timeZoneId) {
+ JSONArray array = new JSONArray();
+ for (BulkResponseImpl response : responses) {
+ array.add(response.toJSONObject(timeZoneId));
+ }
+ return array;
+ }
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.BulkResponseInfo;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.BulkJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+
+/**
+ * The command to get a job info for a list of bundle jobs by given filters.
+ */
+public class BulkJobsXCommand extends XCommand<BulkResponseInfo> {
+ private Map<String,List<String>> bulkParams;
+ private int start = 1;
+ private int len = 50;
+
+ /**
+ * The constructor for BundleJobsXCommand
+ *
+ * @param filter the filter string
+ * @param start start location for paging
+ * @param len total length to get
+ */
+ public BulkJobsXCommand(Map<String,List<String>> filter, int start, int length) {
+ super("bundle.job.info", "bundle.job.info", 1);
+ this.bulkParams = filter;
+ this.start = start;
+ this.len = length;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#isLockRequired()
+ */
+ @Override
+ protected boolean isLockRequired() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#getEntityKey()
+ */
+ @Override
+ public String getEntityKey() {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#loadState()
+ */
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+ */
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#execute()
+ */
+ @Override
+ protected BulkResponseInfo execute() throws CommandException {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ BulkResponseInfo bulk = null;
+ if (jpaService != null) {
+ bulk = jpaService.execute(new BulkJPAExecutor(bulkParams, start, len));
+ }
+ else {
+ LOG.error(ErrorCode.E0610);
+ }
+ return bulk;
+ }
+ catch (XException ex) {
+ throw new CommandException(ex);
+ }
+ }
+
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.rest.BulkResponseImpl;
+import org.apache.oozie.BulkResponseInfo;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Load the coordinators for specified bundle in the Coordinator job bean
+ */
+public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
+ private Map<String, List<String>> bulkFilter;
+ // defaults
+ private int start = 1;
+ private int len = 50;
+
+ public static final String bundleQuery = "SELECT b.id, b.status FROM BundleJobBean b WHERE b.appName = :appName";
+ // Join query
+ public static final String actionQuery = "SELECT a.id, a.actionNumber, a.errorCode, a.errorMessage, a.externalId, " +
+ "a.externalStatus, a.status, a.createdTimestamp, a.nominalTimestamp, a.missingDependencies, " +
+ "c.id, c.appName, c.status FROM CoordinatorActionBean a, CoordinatorJobBean c " +
+ "WHERE a.jobId = c.id AND c.bundleId = :bundleId ORDER BY a.jobId, a.createdTimestamp";
+ public static final String countQuery = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c ";
+
+ public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
+ ParamChecker.notNull(bulkFilter, "bulkFilter");
+ this.bulkFilter = bulkFilter;
+ this.start = start;
+ this.len = len;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ @Override
+ public String getName() {
+ return "BulkJPAExecutor";
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
+ BundleJobBean bundleBean = new BundleJobBean();
+
+ List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
+
+ try {
+
+ // Lightweight Query 1 on Bundle level to fetch the bundle job params corresponding to name
+ String bundleName = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0);
+ Query q = em.createQuery(bundleQuery);
+ q.setParameter("appName", bundleName);
+ List<Object[]> bundles = (List<Object[]>) q.getResultList();
+ if(bundles.isEmpty()) {
+ throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries found for bundle name: " + bundleName);
+ }
+ if(bundles.size() > 1) { // more than one bundles running with same name - ERROR. Fail fast
+ throw new JPAExecutorException(ErrorCode.E0603, "Non-unique bundles present for same bundle name: " + bundleName);
+ }
+ bundleBean = getBeanForBundleJob(bundles.get(0), bundleName);
+
+ //Join query between coordinator job and coordinator action tables to get entries for specific bundleId only
+ StringBuilder conditionClause = new StringBuilder();
+ conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME)));
+ conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS)));
+ conditionClause.append(timesClause());
+ StringBuilder getActions = new StringBuilder(actionQuery);
+ int offset = getActions.indexOf("ORDER");
+ getActions.insert(offset-1, conditionClause);
+ q = em.createQuery(getActions.toString());
+ q.setParameter("bundleId", bundleBean.getId());
+ // pagination
+ q.setFirstResult(start - 1);
+ q.setMaxResults(len);
+
+ List<Object[]> response = q.getResultList();
+ for (Object[] r : response) {
+ BulkResponseImpl br = getResponseFromObject(bundleBean, r);
+ responseList.add(br);
+ }
+
+ StringBuilder getTotal = new StringBuilder(countQuery);
+ String query = q.toString();
+ getTotal.append(query.substring(query.indexOf("WHERE"), query.indexOf("ORDER")));
+ Query qTotal = em.createQuery(getTotal.toString());
+ qTotal.setParameter("bundleId", bundleBean.getId());
+ long total = ((Long) qTotal.getSingleResult()).longValue();
+
+ BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len , total);
+ return bulk;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e);
+ }
+ }
+
+ // Form the where clause to filter by coordinator names
+ private StringBuilder coordNamesClause(List<String> coordNames) {
+ StringBuilder sb = new StringBuilder();
+ boolean firstVal = true;
+ for (String name : nullToEmpty(coordNames)) {
+ if(firstVal) {
+ sb.append(" AND c.appName IN (\'" + name + "\'");
+ firstVal = false;
+ } else {
+ sb.append(",\'" + name + "\'");
+ }
+ }
+ if(!firstVal) {
+ sb.append(") ");
+ }
+ return sb;
+ }
+
+ // Form the where clause to filter by coord action status
+ private StringBuilder statusClause(List<String> statuses) {
+ StringBuilder sb = new StringBuilder();
+ boolean firstVal = true;
+ for (String status : nullToEmpty(statuses)) {
+ if(firstVal) {
+ sb.append(" AND a.status IN (\'" + status + "\'");
+ firstVal = false;
+ } else {
+ sb.append(",\'" + status + "\'");
+ }
+ }
+ if(!firstVal) {
+ sb.append(") ");
+ } else { // statuses was null. adding default
+ sb.append(" AND a.status IN ('KILLED', 'FAILED') ");
+ }
+ return sb;
+ }
+
+ private StringBuilder timesClause() throws ParseException {
+ StringBuilder sb = new StringBuilder();
+ List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
+ if(times != null) {
+ sb.append(" AND a.createdTimestamp >= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+ }
+ times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
+ if(times != null) {
+ sb.append(" AND a.createdTimestamp <= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+ }
+ times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
+ if(times != null) {
+ sb.append(" AND a.nominalTimestamp >= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+ }
+ times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
+ if(times != null) {
+ sb.append(" AND a.nominalTimestamp <= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+ }
+ return sb;
+ }
+
+ private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
+ BulkResponseImpl bean = new BulkResponseImpl();
+ CoordinatorJobBean coordBean = new CoordinatorJobBean();
+ CoordinatorActionBean actionBean = new CoordinatorActionBean();
+ if (arr[0] != null) {
+ actionBean.setId((String) arr[0]);
+ }
+ if (arr[1] != null) {
+ actionBean.setActionNumber((Integer) arr[1]);
+ }
+ if (arr[2] != null) {
+ actionBean.setErrorCode((String) arr[2]);
+ }
+ if (arr[3] != null) {
+ actionBean.setErrorMessage((String) arr[3]);
+ }
+ if (arr[4] != null) {
+ actionBean.setExternalId((String) arr[4]);
+ }
+ if (arr[5] != null) {
+ actionBean.setExternalStatus((String) arr[5]);
+ }
+ if (arr[6] != null) {
+ actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
+ }
+ if (arr[7] != null) {
+ actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
+ }
+ if (arr[8] != null) {
+ actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
+ }
+ if (arr[9] != null) {
+ actionBean.setMissingDependencies((String) arr[9]);
+ }
+ if (arr[10] != null) {
+ coordBean.setId((String) arr[10]);
+ actionBean.setJobId((String) arr[10]);
+ }
+ if (arr[11] != null) {
+ coordBean.setAppName((String) arr[11]);
+ }
+ if (arr[12] != null) {
+ coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
+ }
+ bean.setBundle(bundleBean);
+ bean.setCoordinator(coordBean);
+ bean.setAction(actionBean);
+ return bean;
+ }
+
+ private BundleJobBean getBeanForBundleJob(Object[] barr, String name) throws Exception {
+ BundleJobBean bean = new BundleJobBean();
+ if (barr[0] != null) {
+ bean.setId((String) barr[0]);
+ } else {
+ throw new JPAExecutorException(ErrorCode.E0603, "bundleId returned by query is null - cannot retrieve bulk results");
+ }
+ bean.setAppName(name);
+ if (barr[1] != null) {
+ bean.setStatus(BundleJob.Status.valueOf((String) barr[1]));
+ }
+ return bean;
+ }
+
+ // null safeguard
+ public static List<String> nullToEmpty(List<String> input) {
+ return input == null ? Collections.<String>emptyList() : input;
+ }
+
+}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java Wed Aug 22 21:32:58 2012
@@ -52,7 +52,8 @@ public abstract class BaseJobsServlet ex
String.class, false, Arrays.asList("GET")),
new JsonRestServlet.ParameterInfo(RestConstants.LEN_PARAM,
String.class, false, Arrays.asList("GET")),
-
+ new JsonRestServlet.ParameterInfo(RestConstants.JOBS_BULK_PARAM,
+ String.class, false, Arrays.asList("GET")),
new JsonRestServlet.ParameterInfo(
RestConstants.JOBS_EXTERNAL_ID_PARAM, String.class,
false, Arrays.asList("GET"))));
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java Wed Aug 22 21:32:58 2012
@@ -24,6 +24,8 @@ import javax.servlet.http.HttpServletReq
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.BaseEngineException;
+import org.apache.oozie.BulkResponseInfo;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.BundleJobInfo;
import org.apache.oozie.CoordinatorEngine;
@@ -38,6 +40,7 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.BulkResponseImpl;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.service.CoordinatorEngineService;
@@ -127,17 +130,22 @@ public class V1JobsServlet extends BaseJ
@Override
protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
JSONObject json = null;
- String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
- jobtype = (jobtype != null) ? jobtype : "wf";
+ String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
+ if(isBulk != null) {
+ json = getBulkJobs(request);
+ } else {
+ String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
+ jobtype = (jobtype != null) ? jobtype : "wf";
- if (jobtype.contains("wf")) {
- json = getWorkflowJobs(request);
- }
- else if (jobtype.contains("coord")) {
- json = getCoordinatorJobs(request);
- }
- else if (jobtype.contains("bundle")) {
- json = getBundleJobs(request);
+ if (jobtype.contains("wf")) {
+ json = getWorkflowJobs(request);
+ }
+ else if (jobtype.contains("coord")) {
+ json = getCoordinatorJobs(request);
+ }
+ else if (jobtype.contains("bundle")) {
+ json = getBundleJobs(request);
+ }
}
return json;
}
@@ -371,6 +379,37 @@ public class V1JobsServlet extends BaseJ
return json;
}
+ @SuppressWarnings("unchecked")
+ private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
+ JSONObject json = new JSONObject();
+ try {
+ String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
+ String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
+ String lenStr = request.getParameter(RestConstants.LEN_PARAM);
+ String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
+ ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
+ int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
+ start = (start < 1) ? 1 : start;
+ int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
+ len = (len < 1) ? 50 : len;
+
+ BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
+ getAuthToken(request));
+ BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
+ List<BulkResponseImpl> jsonResponse = bulkResponse.getResponses();
+
+ json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(jsonResponse, timeZoneId));
+ json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
+ json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
+ json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
+
+ }
+ catch (BaseEngineException ex) {
+ throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+ }
+ return json;
+ }
+
/**
* service implementation to submit a http job
*/
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java Wed Aug 22 21:32:58 2012
@@ -38,7 +38,7 @@ public class DateUtils {
public static final TimeZone UTC = getTimeZone("UTC");
- private static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'";
+ public static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'";
private static final String ISO8601_TZ_MASK_WITHOUT_OFFSET = "yyyy-MM-dd'T'HH:mm";
private static String ACTIVE_MASK = ISO8601_UTC_MASK;
Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.BulkResponseInfo;
+import org.apache.oozie.BundleEngine;
+import org.apache.oozie.BundleEngineException;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.rest.BulkResponseImpl;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+
+public class TestBulkMonitorJPAExecutor extends XDataTestCase {
+ Services services;
+ JPAService jpaService;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ LocalOozie.start();
+ jpaService = Services.get().get(JPAService.class);
+ addRecordsForBulkMonitor();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ LocalOozie.stop();
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testSingleRecord() throws Exception {
+
+ String request = "bundle=" + bundleName + ";actionstatus=FAILED;"
+ + "startcreatedtime=2012-07-21T00:00Z;endcreatedtime=2012-07-22T02:00Z";
+
+ List<BulkResponseImpl> brList = _execQuery(request);
+ assertEquals(1, brList.size()); // only 1 action satisfies the
+ // conditions
+ BulkResponseImpl br = brList.get(0);
+ assertEquals(bundleName, br.getBundle().getAppName());
+ assertEquals("Coord1", br.getCoordinator().getAppName());
+ assertEquals(CoordinatorAction.Status.FAILED, br.getAction().getStatus());
+ assertEquals(DateUtils.parseDateUTC(CREATE_TIME).toString(), br.getAction().getCreatedTime().toString());
+ }
+
+ public void testMultipleRecords() throws Exception {
+
+ String request = "bundle=" + bundleName + ";actionstatus=FAILED,KILLED;"
+ + "startcreatedtime=2012-07-21T00:00Z;endcreatedtime=2012-07-22T02:00Z";
+
+ List<BulkResponseImpl> brList = _execQuery(request);
+ assertEquals(3, brList.size()); // 3 actions satisfy the conditions
+ List<String> possibleStatus = new ArrayList<String>(Arrays.asList("KILLED", "FAILED"));
+ List<String> resultStatus = new ArrayList<String>();
+ resultStatus.add(brList.get(0).getAction().getStatus().toString());
+ resultStatus.add(brList.get(1).getAction().getStatus().toString());
+ assertEquals(possibleStatus, resultStatus);
+ }
+
+ public void testJavaNoRecords() throws Exception {
+
+ String request = "bundle=BUNDLE-ABC;actionstatus=FAILED";
+
+ BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
+ try {
+ jpaService.execute(bulkjpa);
+ fail(); // exception expected due to no records found for this
+ // bundle
+ }
+ catch (JPAExecutorException jex) {
+ assertTrue(jex.getMessage().contains("No bundle entries found"));
+ }
+ }
+
+ public void testMultipleCoordinators() throws Exception {
+ // there are 3 coordinators but giving range as only two of them
+ String request = "bundle=" + bundleName + ";coordinators=Coord1,Coord2;actionstatus=KILLED";
+ List<BulkResponseImpl> brList = _execQuery(request);
+ assertEquals(2, brList.size()); // 2 actions satisfy the conditions
+ assertEquals(brList.get(0).getAction().getId(), "Coord1@2");
+ assertEquals(brList.get(1).getAction().getId(), "Coord2@1");
+ }
+
+ public void testDefaultStatus() throws Exception {
+ // adding coordinator action #4 to Coord#3
+ addRecordToCoordActionTable("Coord3", 1, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
+
+ String request = "bundle=" + bundleName + ";";
+ List<BulkResponseImpl> brList = _execQuery(request);
+ assertEquals(4, brList.size()); // 4 actions satisfy the conditions
+ List<String> possibleStatus = new ArrayList<String>(Arrays.asList("FAILED", "KILLED"));
+ List<String> resultStatus = new ArrayList<String>();
+ resultStatus.add(brList.get(0).getAction().getStatus().toString());
+ resultStatus.add(brList.get(1).getAction().getStatus().toString());
+ assertEquals(possibleStatus, resultStatus);
+ }
+
+ public void testMultipleBundleIdsForName() throws Exception {
+ // Adding another bundle having same name
+ BundleJobBean bundle = new BundleJobBean();
+ bundle.setId("00002-12345-B");
+ bundle.setAppName(bundleName);
+ bundle.setStatus(BundleJob.Status.RUNNING);
+ bundle.setStartTime(new Date());
+ BundleJobInsertJPAExecutor bundleInsert = new BundleJobInsertJPAExecutor(bundle);
+
+ jpaService.execute(bundleInsert);
+ String request = "bundle=" + bundleName;
+ BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
+ try {
+ jpaService.execute(bulkjpa);
+ fail(); // exception expected due to >1 records found for same
+ // bundle name
+ }
+ catch (JPAExecutorException jex) {
+ assertTrue(jex.getMessage().contains("Non-unique bundles present for same bundle name"));
+ }
+ }
+
+ private List<BulkResponseImpl> _execQuery(String request) throws JPAExecutorException, BundleEngineException {
+ BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
+ BulkResponseInfo response = jpaService.execute(bulkjpa);
+ assertNotNull(response);
+ return response.getResponses();
+ }
+
+}
\ No newline at end of file
Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.servlet;
+
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.rest.JsonTags;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.service.AuthorizationService;
+import org.apache.oozie.service.BundleEngineService;
+import org.apache.oozie.service.ForTestAuthorizationService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.ProxyUserService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.servlet.AuthFilter;
+import org.apache.oozie.servlet.HostnameFilter;
+import org.apache.oozie.servlet.V1JobsServlet;
+import org.apache.oozie.test.EmbeddedServletContainer;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+public class TestBulkMonitorWebServiceAPI extends XDataTestCase {
+ private EmbeddedServletContainer container;
+ private String servletPath;
+ Services services;
+ JPAService jpaService;
+
+ static {
+ new V1JobsServlet();
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ cleanUpDBTables();
+ LocalOozie.start();
+ jpaService = Services.get().get(JPAService.class);
+ addRecordsForBulkMonitor();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ LocalOozie.stop();
+ services.destroy();
+ super.tearDown();
+ }
+
+ private URL createURL(String servletPath, String resource, Map<String, String> parameters) throws Exception {
+ StringBuilder sb = new StringBuilder();
+ sb.append(container.getServletURL(servletPath));
+ if (resource != null && resource.length() > 0) {
+ sb.append("/").append(resource);
+ }
+ if (parameters.size() > 0) {
+ String separator = "?";
+ for (Map.Entry<String, String> param : parameters.entrySet()) {
+ sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=")
+ .append(URLEncoder.encode(param.getValue(), "UTF-8"));
+ separator = "&";
+ }
+ }
+ return new URL(sb.toString());
+ }
+
+ private URL createURL(String resource, Map<String, String> parameters) throws Exception {
+ return createURL(servletPath, resource, parameters);
+ }
+
+ private void runTest(String servletPath, Class servletClass, boolean securityEnabled, Callable<Void> assertions)
+ throws Exception {
+ runTest(new String[] { servletPath }, new Class[] { servletClass }, securityEnabled, assertions);
+ }
+
+ private void runTest(String[] servletPath, Class[] servletClass, boolean securityEnabled, Callable<Void> assertions)
+ throws Exception {
+ Services services = new Services();
+ this.servletPath = servletPath[0];
+ try {
+ String proxyUser = getTestUser();
+ services.getConf().set(ProxyUserService.CONF_PREFIX + proxyUser + ProxyUserService.HOSTS, "*");
+ services.getConf().set(ProxyUserService.CONF_PREFIX + proxyUser + ProxyUserService.GROUPS, "*");
+ services.init();
+ services.getConf().setBoolean(AuthorizationService.CONF_SECURITY_ENABLED, securityEnabled);
+ Services.get().setService(ForTestAuthorizationService.class);
+ Services.get().setService(BundleEngineService.class);
+ container = new EmbeddedServletContainer("oozie");
+ for (int i = 0; i < servletPath.length; i++) {
+ container.addServletEndpoint(servletPath[i], servletClass[i]);
+ }
+ container.addFilter("*", HostnameFilter.class);
+ container.addFilter("*", AuthFilter.class);
+ setSystemProperty("user.name", getTestUser());
+ container.start();
+ assertions.call();
+ }
+ finally {
+ this.servletPath = null;
+ if (container != null) {
+ container.stop();
+ }
+ services.destroy();
+ container = null;
+ }
+ }
+
+ public void testSingleRecord() throws Exception {
+ runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+ public Void call() throws Exception {
+
+ String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1;"
+ + "actionStatus=FAILED;startcreatedtime=2012-07-21T00:00Z";
+ JSONArray array = _requestToServer(bulkRequest);
+
+ assertEquals(1, array.size());
+ JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+ JSONObject jcoord = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR);
+ JSONObject jaction = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+ assertNotNull(jbundle);
+ assertNotNull(jcoord);
+ assertNotNull(jaction);
+
+ assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), "BUNDLE-TEST");
+ assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), "Coord1");
+ assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), "RUNNING");
+ assertEquals(jaction.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED");
+ assertEquals((jaction.get(JsonTags.COORDINATOR_ACTION_CREATED_TIME).toString().split(", "))[1],
+ DateUtils.parseDateUTC(CREATE_TIME).toGMTString());
+ return null;
+ }
+ });
+ }
+
+ public void testMultipleRecords() throws Exception {
+ runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+ public Void call() throws Exception {
+
+ String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1;"
+ + "actionStatus=FAILED,KILLED;startcreatedtime=2012-07-21T00:00Z";
+ JSONArray array = _requestToServer(bulkRequest);
+
+ assertEquals(2, array.size());
+ JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+ assertNotNull(jbundle);
+ JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+ JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+ assertNotNull(jaction1);
+ assertNotNull(jaction2);
+
+ assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), bundleName);
+ assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED");
+ assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_STATUS), "KILLED");
+
+ return null;
+ }
+ });
+ }
+
+ public void testNoRecords() throws Exception {
+ runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+ public Void call() throws Exception {
+ String bulkRequest = "bundle=BUNDLE-ABC";
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
+ params.put(RestConstants.OFFSET_PARAM, "1");
+ params.put(RestConstants.LEN_PARAM, "5");
+ URL url = createURL("", params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ // WS call will throw BAD_REQUEST code 400 error because no
+ // records found for this bundle
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+ return null;
+ }
+ });
+ }
+
+ public void testMultipleCoordinators() throws Exception {
+ runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+ public Void call() throws Exception {
+ // giving range as 2 of the total 3 coordinators
+ String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1,Coord2;actionstatus=KILLED";
+ JSONArray array = _requestToServer(bulkRequest);
+
+ assertEquals(2, array.size());
+ JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+ assertNotNull(jbundle);
+ JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+ JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+ assertNotNull(jaction1);
+ assertNotNull(jaction2);
+
+ assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_ID), "Coord1@2");
+ assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_ID), "Coord2@1");
+
+ return null;
+ }
+ });
+ }
+
+ public void testDefaultStatus() throws Exception {
+
+ // adding coordinator action #4 to Coord#3
+ CoordinatorActionBean action4 = new CoordinatorActionBean();
+ action4.setId("Coord3@1");
+ action4.setStatus(CoordinatorAction.Status.FAILED);
+ action4.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+ action4.setJobId("Coord3");
+
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+ cal.add(Calendar.DATE, -1);
+
+ action4.setNominalTime(cal.getTime());
+ CoordActionInsertJPAExecutor actionInsert = new CoordActionInsertJPAExecutor(action4);
+ jpaService.execute(actionInsert);
+
+ runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+ public Void call() throws Exception {
+ String bulkRequest = "bundle=" + bundleName;
+ JSONArray array = _requestToServer(bulkRequest);
+
+ assertEquals(4, array.size());
+
+ return null;
+ }
+ });
+ }
+
+ public void testMultipleBundleIdsForName() throws Exception {
+
+ // Adding another bundle having same name
+ BundleJobBean bundle = new BundleJobBean();
+ bundle.setId("00002-12345-B");
+ bundle.setAppName(bundleName);
+ bundle.setStatus(BundleJob.Status.RUNNING);
+ bundle.setStartTime(new Date());
+ BundleJobInsertJPAExecutor bundleInsert = new BundleJobInsertJPAExecutor(bundle);
+ jpaService.execute(bundleInsert);
+
+ runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+ public Void call() throws Exception {
+ String bulkRequest = "bundle=" + bundleName;
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
+ params.put(RestConstants.OFFSET_PARAM, "1");
+ params.put(RestConstants.LEN_PARAM, "5");
+ URL url = createURL("", params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ // WS call will throw BAD_REQUEST code 400 error because no
+ // records found for this bundle
+ assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+ return null;
+ }
+ });
+ }
+
+ private JSONArray _requestToServer(String bulkRequest) throws Exception {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
+ params.put(RestConstants.OFFSET_PARAM, "1");
+ params.put(RestConstants.LEN_PARAM, "5");
+ URL url = createURL("", params);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+ assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+ assertTrue(conn.getHeaderField("content-type").startsWith(RestConstants.JSON_CONTENT_TYPE));
+ JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
+ JSONArray array = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
+ return array;
+ }
+
+}
\ No newline at end of file
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed Aug 22 21:32:58 2012
@@ -25,6 +25,7 @@ import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
+import java.util.Calendar;
import java.util.Date;
import java.util.regex.Matcher;
@@ -39,6 +40,7 @@ import org.apache.oozie.SLAEventBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
+import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
@@ -89,6 +91,9 @@ public abstract class XDataTestCase exte
+ " <sla:qa-contact>abc@yahoo.com</sla:qa-contact>" + " <sla:se-contact>abc@yahoo.com</sla:se-contact>"
+ "</sla:info>";
+ protected String bundleName;
+ protected String CREATE_TIME = "2012-07-22T00:00Z";
+
/**
* Insert coord job for testing.
*
@@ -1128,4 +1133,64 @@ public abstract class XDataTestCase exte
}
}
+ /**
+ * Adds the db records for the Bulk Monitor tests
+ */
+ protected void addRecordsForBulkMonitor() throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ // adding the bundle job
+ BundleJobBean bundle = addRecordToBundleJobTable(BundleJob.Status.RUNNING, false);
+ String bundleId = bundle.getId();
+ bundleName = bundle.getAppName();
+
+ // adding coordinator job(s) for this bundle
+ addRecordToCoordJobTableWithBundle(bundleId, "Coord1", CoordinatorJob.Status.RUNNING, true, true, 2);
+ addRecordToCoordJobTableWithBundle(bundleId, "Coord2", CoordinatorJob.Status.RUNNING, true, true, 1);
+ addRecordToCoordJobTableWithBundle(bundleId, "Coord3", CoordinatorJob.Status.RUNNING, true, true, 1);
+
+ // adding coordinator action #1 to Coord#1
+ CoordinatorActionBean action1 = new CoordinatorActionBean();
+ action1.setId("Coord1@1");
+ action1.setStatus(CoordinatorAction.Status.FAILED);
+ action1.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+ action1.setJobId("Coord1");
+
+ Calendar cal = Calendar.getInstance();
+ cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+ cal.add(Calendar.DATE, -1);
+
+ action1.setNominalTime(cal.getTime());
+ CoordActionInsertJPAExecutor actionInsert = new CoordActionInsertJPAExecutor(action1);
+ jpaService.execute(actionInsert);
+
+ // adding coordinator action #2 to Coord#1
+ CoordinatorActionBean action2 = new CoordinatorActionBean();
+ action2.setId("Coord1@2");
+ action2.setStatus(CoordinatorAction.Status.KILLED);
+ action2.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+ action2.setJobId("Coord1");
+
+ cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+ cal.add(Calendar.DATE, -1);
+
+ action2.setNominalTime(cal.getTime());
+ actionInsert = new CoordActionInsertJPAExecutor(action2);
+ jpaService.execute(actionInsert);
+
+ // adding coordinator action #3 to Coord#2
+ CoordinatorActionBean action3 = new CoordinatorActionBean();
+ action3.setId("Coord2@1");
+ action3.setStatus(CoordinatorAction.Status.KILLED);
+ action3.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+ action3.setJobId("Coord2");
+
+ cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+ cal.add(Calendar.DATE, -1);
+
+ action3.setNominalTime(cal.getTime());
+ actionInsert = new CoordActionInsertJPAExecutor(action3);
+ jpaService.execute(actionInsert);
+ }
+
}
Modified: incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki (original)
+++ incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki Wed Aug 22 21:32:58 2012
@@ -786,6 +786,110 @@ Additionally the =start= and =len= param
Moreover, the =jobtype= parameter could be used to determine what type of job is looking for.
The valid values of job type are: =workflow=, =coordinator= or =bundle=.
+---++++ Jobs information using Bulk API
+
+A HTTP GET request retrieves a bulk response for all actions, corresponding to a particular bundle, that satisfy user specified criteria.
+This is useful for monitoring purposes, where user can find out about the status of downstream jobs with a single bulk request.
+The criteria are used for filtering the actions returned. Valid options (case insensitive) for these request criteria are:
+
+ * bundle: the application name from the bundle definition
+ * coordinators: the application name(s) from the coordinator definition.
+ * actionStatus: the status of coordinator action (Valid values are WAITING, READY, SUBMITTED, RUNNING, SUSPENDED, TIMEDOUT, SUCCEEDED, KILLED, FAILED)
+ * startCreatedTime: the start of the window you want to look at, of the actions' created time
+ * endCreatedTime: the end of above window
+ * startScheduledTime: the start of the window you want to look at, of the actions' scheduled i.e. nominal time.
+ * endScheduledTime: the end of above window
+
+Specifying 'bundle' is REQUIRED. All the rest are OPTIONAL but that might result in thousands of results depending on the size of your job. (pagination comes into play then)
+
+The query will do an AND among all the filter names.
+
+The syntax for the request criteria is <verbatim>[NAME=VALUE][;NAME=VALUE]*</verbatim>
+
+For 'coordinators' and 'actionStatus', if user wants to check for multiple values, they can be passed in a comma-separated manner. *Note*: The query will do an OR among them. Hence no need to repeat the criteria name
+
+All the time parameter values should be specified in ISO8601 format i.e. yyyy-MM-dd'T'HH:mm'Z'
+
+Additionally the =offset= and =len= parameters can be used as usual for pagination. The start parameter is base 1.
+
+*Request:*
+
+<verbatim>
+GET /oozie/v1/jobs?bulk=bundle%3Dmy-bundle-app;coordinators%3Dmy-coord-1,my-coord-5;actionStatus%3DKILLED&offset=1&len=50
+</verbatim>
+
+Note that the filter is URL encoded, its decoded value is <code>user=bansalm</code>. If typing in browser URL, one can type decoded value itself i.e. using '='
+
+*Response:*
+
+<verbatim>
+HTTP/1.1 200 OK
+Content-Type: application/json;charset=UTF-8
+.
+{
+ offset: 1,
+ len: 50,
+ total: 1002,
+** bulkresponses: [
+** {
+ bulkbundle:
+ {
+ bundleJobName: "my-bundle-app",
+ bundleJobId: "0-200905191240-oozie-B",
+ status: "SUSPENDED",
+ },
+ bulkcoord:
+ {
+ coordJobName: "my-coord-1",
+ status: "SUSPENDED",
+ },
+ bulkaction:
+ {
+ id: "0-200905191240-oozie-C@21",
+ coordJobId: "0-200905191240-oozie-C",
+ actionNumber: 21,
+ externalId: "job_00076_0009",
+ status: "KILLED",
+ externalStatus: "FAILED",
+ errorCode: "E0902",
+ errorMessage: "Input file corrupt",
+ createdTime: "Fri, 02 Jan 2009 00:00:00 GMT",
+ nominalTime: "Thu, 01 Jan 2009 00:00:00 GMT",
+ missingDependencies: "hdfs://nn:port/user/joe/file.txt"
+ },
+ },
+** {
+ bulkbundle:
+ {
+ bundleJobName: "my-bundle-app",
+ bundleJobId: "0-200905191240-oozie-B",
+ status: "SUSPENDED",
+ }
+ bulkcoord:
+ {
+ coordJobName: "my-coord-5",
+ status: "SUSPENDED",
+ }
+ bulkaction:
+ {
+ id: "0-200905191245-oozie-C@114",
+ coordJobId: "0-200905191245-oozie-C",
+ actionNumber: 114,
+ externalId: "job_00076_0265",
+ status: "KILLED",
+ externalStatus: "KILLED",
+ errorCode: "E0603",
+ errorMessage: "SQL error in operation ...",
+ createdTime: "Fri, 02 Jan 2009 00:00:00 GMT",
+ nominalTime: "Thu, 01 Jan 2009 00:00:00 GMT",
+ missingDependencies:
+ }
+ }
+ ...
+ ]
+}
+</verbatim>
+
</noautolink>
Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Aug 22 21:32:58 2012
@@ -1,5 +1,6 @@
-- Oozie 3.3.0 release (trunk - unreleased)
+OOZIE-848 Bulk Monitoring API - Consolidated view of jobs (mona via virag)
OOZIE-934 Exception reporting during Services startup is inadequate (mona via virag)
OOZIE-914 Make sure all commands do their JPA writes within a single JPA executor (mona via virag)
OOZIE-918 Changing log level from DEBUG to TRACE for trivial log statements (mona via virag)