You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by vi...@apache.org on 2012/08/22 23:32:59 UTC

svn commit: r1376264 - in /incubator/oozie/trunk: ./ client/src/main/java/org/apache/oozie/cli/ client/src/main/java/org/apache/oozie/client/ client/src/main/java/org/apache/oozie/client/rest/ core/src/main/java/org/apache/oozie/ core/src/main/java/org...

Author: virag
Date: Wed Aug 22 21:32:58 2012
New Revision: 1376264

URL: http://svn.apache.org/viewvc?rev=1376264&view=rev
Log:
OOZIE-848 Bulk Monitoring API - Consolidated view of jobs (mona via virag)

Added:
    incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
Modified:
    incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
    incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
    incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
    incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
    incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
    incubator/oozie/trunk/release-log.txt

Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/cli/OozieCLI.java Wed Aug 22 21:32:58 2012
@@ -50,6 +50,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.oozie.BuildInfo;
 import org.apache.oozie.client.AuthOozieClient;
+import org.apache.oozie.client.BulkResponse;
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
@@ -133,6 +134,8 @@ public class OozieCLI {
     
     public static final String INFO_TIME_ZONES_OPTION = "timezones";
 
+    public static final String BULK_OPTION = "bulk";
+
     private static final String[] OOZIE_HELP = {
             "the env variable '" + ENV_OOZIE_URL + "' is used as default value for the '-" + OOZIE_OPTION + "' option",
             "the env variable '" + ENV_OOZIE_TIME_ZONE + "' is used as default value for the '-" + TIME_ZONE_OPTION + "' option",
@@ -327,6 +330,10 @@ public class OozieCLI {
                 "use time zone with the specified ID (default GMT).\nSee 'oozie info -timezones' for a list");
         Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode");
         Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates as the specified user");
+        Option bulkMonitor = new Option(BULK_OPTION, true, "key-value pairs to filter bulk jobs response. e.g. bundle=<B>\\;" +
+                "coordinators=<C>\\;actionstatus=<S>\\;startcreatedtime=<SC>\\;endcreatedtime=<EC>\\;" +
+                "startscheduledtime=<SS>\\;endscheduledtime=<ES>\\; coordinators and actionstatus can be multiple comma separated values" +
+                "bundle and coordinators are 'names' of those jobs. Bundle name is mandatory, other params are optional");
         start.setType(Integer.class);
         len.setType(Integer.class);
         Options jobsOptions = new Options();
@@ -340,6 +347,7 @@ public class OozieCLI {
         jobsOptions.addOption(filter);
         jobsOptions.addOption(jobtype);
         jobsOptions.addOption(verbose);
+        jobsOptions.addOption(bulkMonitor);
         addAuthOptions(jobsOptions);
         return jobsOptions;
     }
@@ -1088,6 +1096,7 @@ public class OozieCLI {
 
     private static final String WORKFLOW_ACTION_FORMATTER = "%-78s%-10s%-23s%-11s%-10s";
     private static final String COORD_ACTION_FORMATTER = "%-43s%-10s%-37s%-10s%-21s%-21s";
+    private static final String BULK_RESPONSE_FORMATTER = "%-41s%-41s%-37s%-37s%-13s%-21s%-24s";
 
     private void printJob(WorkflowJob job, String timeZoneId, boolean verbose) throws IOException {
         System.out.println("Job ID : " + maskIfNull(job.getId()));
@@ -1170,8 +1179,13 @@ public class OozieCLI {
         String timeZoneId = getTimeZoneId(commandLine);
         jobtype = (jobtype != null) ? jobtype : "wf";
         int len = Integer.parseInt((s != null) ? s : "0");
+        String bulkFilterString = commandLine.getOptionValue(BULK_OPTION);
+
         try {
-            if (jobtype.toLowerCase().contains("wf")) {
+            if (bulkFilterString != null) {
+                printBulkJobs(wc.getBulkInfo(bulkFilterString, start, len), timeZoneId);
+            }
+            else if (jobtype.toLowerCase().contains("wf")) {
                 printJobs(wc.getJobsInfo(filter, start, len), timeZoneId, commandLine.hasOption(VERBOSE_OPTION));
             }
             else if (jobtype.toLowerCase().startsWith("coord")) {
@@ -1233,6 +1247,24 @@ public class OozieCLI {
         }
     }
 
+    private void printBulkJobs(List<BulkResponse> jobs, String timeZoneId) throws IOException {
+        if (jobs != null && jobs.size() > 0) {
+            System.out.println(String.format(BULK_RESPONSE_FORMATTER, "Bundle Name", "Coordinator Name",
+                    "Coord Action ID", "External ID", "Status", "Created Time", "Error Message"));
+
+            for (BulkResponse response : jobs) {
+                System.out.println(String.format(BULK_RESPONSE_FORMATTER, maskIfNull((response.getBundle()).getAppName()),
+                        maskIfNull((response.getCoordinator()).getAppName()), maskIfNull((response.getAction()).getId()),
+                        maskIfNull((response.getAction()).getExternalId()), (response.getAction()).getStatus(),
+                        maskDate((response.getAction()).getCreatedTime(), timeZoneId, false), (response.getAction()).getErrorMessage()));
+                System.out.println(RULER);
+            }
+        }
+        else {
+            System.out.println("Bulk request criteria did not match any coordinator actions");
+        }
+    }
+
     private void printBundleJobs(List<BundleJob> jobs, String timeZoneId, boolean verbose) throws IOException {
         if (jobs != null && jobs.size() > 0) {
             if (verbose) {

Added: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java (added)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/BulkResponse.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client;
+
+/**
+ * Bean that represents an Oozie application instance.
+ */
+
+public interface BulkResponse {
+
+    /**
+     * Return the coordinator comprising this bulk object
+     *
+     * @return the coordinator.
+     */
+    CoordinatorJob getCoordinator();
+
+    /**
+     * Return the bundle comprising this bulk object
+     *
+     * @return the bundle job.
+     */
+    BundleJob getBundle();
+
+    /**
+     * Return the coordinator action comprising this bulk object
+     *
+     * @return the coordinator action.
+     */
+    CoordinatorAction getAction();
+}

Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed Aug 22 21:32:58 2012
@@ -1062,6 +1062,32 @@ public class OozieClient {
         }
     }
 
+    private class BulkResponseStatus extends ClientCallable<List<BulkResponse>> {
+
+        BulkResponseStatus(String filter, int start, int len) {
+            super("GET", RestConstants.JOBS, "", prepareParams(RestConstants.JOBS_BULK_PARAM, filter,
+                    RestConstants.OFFSET_PARAM, Integer.toString(start), RestConstants.LEN_PARAM, Integer.toString(len)));
+        }
+
+        @Override
+        protected List<BulkResponse> call(HttpURLConnection conn) throws IOException, OozieClientException {
+            conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
+            if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {
+                Reader reader = new InputStreamReader(conn.getInputStream());
+                JSONObject json = (JSONObject) JSONValue.parse(reader);
+                JSONArray results = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
+                if (results == null) {
+                    results = new JSONArray();
+                }
+                return JsonToBean.createBulkResponseList(results);
+            }
+            else {
+                handleError(conn);
+            }
+            return null;
+        }
+    }
+
     private class CoordRerun extends ClientCallable<List<CoordinatorAction>> {
 
         CoordRerun(String jobId, String rerunType, String scope, boolean refresh, boolean noCleanup) {
@@ -1364,6 +1390,10 @@ public class OozieClient {
         return new BundleJobsStatus(filter, start, len).call();
     }
 
+    public List<BulkResponse> getBulkInfo(String filter, int start, int len) throws OozieClientException {
+        return new BulkResponseStatus(filter, start, len).call();
+    }
+
     private class GetQueueDump extends ClientCallable<List<String>> {
         GetQueueDump() {
             super("GET", RestConstants.ADMIN, RestConstants.ADMIN_QUEUE_DUMP_RESOURCE, prepareParams());

Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java Wed Aug 22 21:32:58 2012
@@ -182,7 +182,15 @@ public interface JsonTags {
     public static final Object BUNDLE_JOB_TOTAL = "total";
     public static final Object BUNDLE_JOB_OFFSET = "offset";
     public static final Object BUNDLE_JOB_LEN = "len";
-    
+
+    public static final String BULK_RESPONSE_BUNDLE = "bulkbundle";
+    public static final String BULK_RESPONSE_COORDINATOR = "bulkcoord";
+    public static final String BULK_RESPONSE_ACTION = "bulkaction";
+    public static final Object BULK_RESPONSES = "bulkresponses";
+    public static final Object BULK_RESPONSE_TOTAL = "total";
+    public static final Object BULK_RESPONSE_OFFSET = "offset";
+    public static final Object BULK_RESPONSE_LEN = "len";
+
     public static final String AVAILABLE_TIME_ZONES = "available-timezones";
     public static final String TIME_ZOME_DISPLAY_NAME = "timezoneDisplayName";
     public static final String TIME_ZONE_ID = "timezoneId";

Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/JsonToBean.java Wed Aug 22 21:32:58 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.client.rest;
 
+import org.apache.oozie.client.BulkResponse;
 import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
@@ -63,6 +64,7 @@ public class JsonToBean {
     private static final Map<String, Property> COORD_JOB = new HashMap<String, Property>();
     private static final Map<String, Property> COORD_ACTION = new HashMap<String, Property>();
     private static final Map<String, Property> BUNDLE_JOB = new HashMap<String, Property>();
+    private static final Map<String, Property> BULK_RESPONSE = new HashMap<String, Property>();
 
     static {
         WF_ACTION.put("getId", new Property(JsonTags.WORKFLOW_ACTION_ID, String.class));
@@ -169,6 +171,11 @@ public class JsonToBean {
         BUNDLE_JOB.put("getConsoleUrl",new Property(JsonTags.BUNDLE_JOB_CONSOLE_URL, String.class));
         BUNDLE_JOB.put("getCoordinators",new Property(JsonTags.BUNDLE_COORDINATOR_JOBS, CoordinatorJob.class, true));
         BUNDLE_JOB.put("toString", new Property(JsonTags.TO_STRING, String.class));
+
+        BULK_RESPONSE.put("getBundle", new Property(JsonTags.BULK_RESPONSE_BUNDLE, BundleJob.class, true));
+        BULK_RESPONSE.put("getCoordinator", new Property(JsonTags.BULK_RESPONSE_COORDINATOR, CoordinatorJob.class, true));
+        BULK_RESPONSE.put("getAction", new Property(JsonTags.BULK_RESPONSE_ACTION, CoordinatorAction.class, true));
+
     }
 
     /**
@@ -369,4 +376,21 @@ public class JsonToBean {
         }
         return list;
     }
+
+    /**
+     * Creates a list of bulk response beans from a JSON array.
+     *
+     * @param json json array.
+     * @return a list of bulk response beans from a JSON array.
+     */
+    public static List<BulkResponse> createBulkResponseList(JSONArray json) {
+        List<BulkResponse> list = new ArrayList<BulkResponse>();
+        for (Object obj : json) {
+            BulkResponse bulkObj = (BulkResponse) Proxy.newProxyInstance
+                    (JsonToBean.class.getClassLoader(), new Class[]{BulkResponse.class},
+                    new JsonInvocationHandler(BULK_RESPONSE, (JSONObject) obj));
+            list.add(bulkObj);
+        }
+        return list;
+    }
 }

Modified: incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java (original)
+++ incubator/oozie/trunk/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java Wed Aug 22 21:32:58 2012
@@ -109,6 +109,8 @@ public interface RestConstants {
 
     public static final String JOBS_FILTER_PARAM = "filter";
 
+    public static final String JOBS_BULK_PARAM = "bulk";
+
     public static final String JOBS_EXTERNAL_ID_PARAM = "external-id";
 
     public static final String ADMIN_STATUS_RESOURCE = "status";

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BulkResponseInfo.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie;
+
+import java.util.List;
+
+import org.apache.oozie.client.rest.BulkResponseImpl;
+
+/**
+ * Class that stores the entire retrieved info from the bulk request command
+ * along with params for pagination of the results
+ * Query execution in BulkJPAExecutor returns an object of BulkResponseInfo class
+ */
+public class BulkResponseInfo {
+    private int start = 1;
+    private int len = 50;
+    private long total;
+    private List<BulkResponseImpl> responses;
+
+    /**
+     * Create a bulk response info bean.
+     *
+     * @param bundle job being returned.
+     * @param start bulk entries offset.
+     * @param len number of bulk entries.
+     * @param total total bulk entries.
+     */
+    public BulkResponseInfo(List<BulkResponseImpl> responses, int start, int len, long total) {
+        this.start = start;
+        this.len = len;
+        this.total = total;
+        this.responses = responses;
+    }
+
+    /**
+     * Return the coordinator actions being returned.
+     *
+     * @return the coordinator actions being returned.
+     */
+    public List<BulkResponseImpl> getResponses() {
+        return responses;
+    }
+
+    /**
+     * Return the offset of the bulk entries being returned.
+     * <p/>
+     * For pagination purposes.
+     *
+     * @return the offset of the bulk entries being returned.
+     */
+    public int getStart() {
+        return start;
+    }
+
+    /**
+     * Return the number of the bulk entries being returned.
+     * <p/>
+     * For pagination purposes.
+     *
+     * @return the number of the bulk entries being returned.
+     */
+    public int getLen() {
+        return len;
+    }
+
+    /**
+     * Return the total number of bulk entries.
+     * <p/>
+     * For pagination purposes.
+     *
+     * @return the total number of bulk entries.
+     */
+    public long getTotal() {
+        return total;
+    }
+}

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/BundleEngine.java Wed Aug 22 21:32:58 2012
@@ -19,6 +19,7 @@ package org.apache.oozie;
 
 import java.io.IOException;
 import java.io.Writer;
+import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -29,10 +30,13 @@ import java.util.Set;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.rest.BulkResponseImpl;
+import org.apache.oozie.command.BulkJobsXCommand;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.bundle.BundleJobChangeXCommand;
 import org.apache.oozie.command.bundle.BundleJobResumeXCommand;
@@ -46,6 +50,7 @@ import org.apache.oozie.command.bundle.B
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogService;
+import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XLogStreamer;
@@ -357,4 +362,99 @@ public class BundleEngine extends BaseEn
         return map;
     }
 
+    /**
+     * Get bulk job response
+     *
+     * @param filter the filter string
+     * @param start start location for paging
+     * @param len total length to get
+     * @return bulk job info
+     * @throws BundleEngineException thrown if failed to get bulk job info
+     */
+    public BulkResponseInfo getBulkJobs(String bulkFilter, int start, int len) throws BundleEngineException {
+        Map<String,List<String>> bulkRequestMap = parseBulkFilter(bulkFilter);
+        try {
+            return new BulkJobsXCommand(bulkRequestMap, start, len).call();
+        }
+        catch (CommandException ex) {
+            throw new BundleEngineException(ex);
+        }
+    }
+
+    /**
+     * Parse filter string to a map with key = filter name and values = filter values
+     * Allowed keys are defined as constants on top
+     *
+     * @param filter the filter string
+     * @return filter key-value pair map
+     * @throws BundleEngineException thrown if failed to parse filter string
+     */
+    public static Map<String,List<String>> parseBulkFilter(String bulkParams) throws BundleEngineException {
+
+        Map<String,List<String>> bulkFilter = new HashMap<String,List<String>>();
+        // Functionality can be extended to different job levels - TODO extend filter parser and query
+        // E.g. String filterlevel = "coordinatoraction"; BulkResponseImpl.BULK_FILTER_LEVEL
+        if (bulkFilter != null) {
+            StringTokenizer st = new StringTokenizer(bulkParams, ";");
+            while (st.hasMoreTokens()) {
+                String token = st.nextToken();
+                if (token.contains("=")) {
+                    String[] pair = token.split("=");
+                    if (pair.length != 2) {
+                        throw new BundleEngineException(ErrorCode.E0420, token,
+                                "elements must be name=value pairs");
+                    }
+                    pair[0] = pair[0].toLowerCase();
+                    String[] values = pair[1].split(",");
+                    if (!BulkResponseImpl.BULK_FILTER_NAMES.contains(pair[0])) {
+                        throw new BundleEngineException(ErrorCode.E0420, token, XLog.format("invalid parameter name [{0}]",
+                                pair[0]));
+                    }
+                    // special check and processing for time related params
+                    if (pair[0].contains("time")) {
+                        try {
+                            DateUtils.parseDateUTC(pair[1]);
+                        }
+                        catch (ParseException e) {
+                            throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
+                                    "invalid value [{0}] for time. A datetime value of pattern [{1}] is expected", pair[1],
+                                    DateUtils.ISO8601_UTC_MASK));
+                        }
+                    }
+                    // special check for action status param
+                    // TODO: when extended for levels other than coord action, check against corresponding level's Status values
+                    if (pair[0].equals(BulkResponseImpl.BULK_FILTER_STATUS)) {
+                        for(String value : values) {
+                            try {
+                                CoordinatorAction.Status.valueOf(value);
+                            }
+                            catch (IllegalArgumentException ex) {
+                                throw new BundleEngineException(ErrorCode.E0420, token, XLog.format(
+                                        "invalid action status [{0}]", value));
+                            }
+                        }
+                    }
+                    // eventually adding into map for all cases e.g. names, times, status
+                    List<String> list = bulkFilter.get(pair[0]);
+                    if (list == null) {
+                        list = new ArrayList<String>();
+                        bulkFilter.put(pair[0], list);
+                    }
+                    for(String value : values) {
+                        value = value.trim();
+                        if(value.isEmpty()) {
+                            throw new BundleEngineException(ErrorCode.E0420, token, "value is empty or whitespace");
+                        }
+                        list.add(value);
+                    }
+                } else {
+                    throw new BundleEngineException(ErrorCode.E0420, token, "elements must be name=value pairs");
+                }
+            }
+            if(!bulkFilter.containsKey(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME)) {
+                throw new BundleEngineException(ErrorCode.E0305, BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
+            }
+        }
+        return bulkFilter;
+    }
 }

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.client.rest;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.client.BulkResponse;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+
+/**
+ * Server-side implementation class of the client interface BulkResponse
+ * Declares all the bulk request specific user parameters and handling as JSON object
+ */
+public class BulkResponseImpl implements BulkResponse, JsonBean {
+    private BundleJobBean bundle;
+    private CoordinatorJobBean coordinator;
+    private CoordinatorActionBean action;
+
+    public static final String BULK_FILTER_BUNDLE_NAME = "bundle";
+    public static final String BULK_FILTER_COORD_NAME = "coordinators";
+    public static final String BULK_FILTER_LEVEL = "filterlevel";
+    public static final String BULK_FILTER_STATUS = "actionstatus";
+    public static final String BULK_FILTER_START_CREATED_EPOCH = "startcreatedtime";
+    public static final String BULK_FILTER_END_CREATED_EPOCH = "endcreatedtime";
+    public static final String BULK_FILTER_START_NOMINAL_EPOCH = "startscheduledtime";
+    public static final String BULK_FILTER_END_NOMINAL_EPOCH = "endscheduledtime";
+    public static final String BULK_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:SS'Z'";
+
+    public static final Set<String> BULK_FILTER_NAMES = new HashSet<String>();
+
+    static {
+
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD_NAME);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_LEVEL);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_STATUS);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
+        BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
+
+    }
+
+    /**
+     * Construct JSON object using the bulk request object and the associated tags
+     */
+    public JSONObject toJSONObject() {
+        return toJSONObject("GMT");
+    }
+
+    /**
+     * Construct JSON object using the bulk request object and the associated tags
+     */
+    @SuppressWarnings("unchecked")
+    public JSONObject toJSONObject(String timeZoneId) {
+        JSONObject json = new JSONObject();
+
+        json.put(JsonTags.BULK_RESPONSE_BUNDLE, bundle.toJSONObject());
+        json.put(JsonTags.BULK_RESPONSE_COORDINATOR, coordinator.toJSONObject());
+        json.put(JsonTags.BULK_RESPONSE_ACTION, action.toJSONObject());
+
+        json.put(JsonTags.BUNDLE_JOB_NAME, bundle.getAppName());
+        json.put(JsonTags.BUNDLE_JOB_ID, bundle.getId());
+        json.put(JsonTags.BUNDLE_JOB_STATUS, bundle.getStatus().toString());
+
+        json.put(JsonTags.COORDINATOR_JOB_NAME, coordinator.getAppName());
+        json.put(JsonTags.COORDINATOR_JOB_STATUS, coordinator.getStatus().toString());
+
+        json.put(JsonTags.COORDINATOR_ACTION_ID, action.getId());
+        json.put(JsonTags.COORDINATOR_JOB_ID, action.getJobId());
+        json.put(JsonTags.COORDINATOR_ACTION_NUMBER, action.getActionNumber());
+        json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, action.getExternalId());
+        json.put(JsonTags.COORDINATOR_ACTION_STATUS, action.getStatus().toString());
+        json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, action.getExternalStatus());
+        json.put(JsonTags.COORDINATOR_ACTION_ERROR_CODE, action.getErrorCode());
+        json.put(JsonTags.COORDINATOR_ACTION_ERROR_MESSAGE, action.getErrorMessage());
+        json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, action.getCreatedTime().toString());
+        json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, action.getNominalTime().toString());
+        json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, action.getMissingDependencies());
+
+        return json;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.client.BulkResponse#getBundle()
+     */
+    @Override
+    public BundleJobBean getBundle() {
+        return bundle;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.client.BulkResponse#getCoordinator()
+     */
+    @Override
+    public CoordinatorJobBean getCoordinator() {
+        return coordinator;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.client.BulkResponse#getAction()
+     */
+    @Override
+    public CoordinatorActionBean getAction() {
+        return action;
+    }
+
+    /**
+     * Sets the bundle comprising this bulk response object
+     * @param BundleJobBean
+     */
+    public void setBundle(BundleJobBean bj) {
+        this.bundle = bj;
+    }
+
+    /**
+     * Sets the coordinator comprising this bulk response object
+     * @param CoordinatorJobBean
+     */
+    public void setCoordinator(CoordinatorJobBean cj) {
+        this.coordinator = cj;
+    }
+
+    /**
+     * Sets the coord action comprising this bulk response object
+     * @param CoordinatorActionBean
+     */
+    public void setAction(CoordinatorActionBean ca) {
+        this.action = ca;
+    }
+
+    /**
+     * Convert a nodes list into a JSONArray.
+     *
+     * @param actions nodes list.
+     * @param timeZoneId time zone to use for dates in the JSON array.
+     * @return the corresponding JSON array.
+     */
+    @SuppressWarnings("unchecked")
+    public static JSONArray toJSONArray(List<? extends BulkResponseImpl> responses, String timeZoneId) {
+        JSONArray array = new JSONArray();
+        for (BulkResponseImpl response : responses) {
+            array.add(response.toJSONObject(timeZoneId));
+        }
+        return array;
+    }
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/BulkJobsXCommand.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.BulkResponseInfo;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.XException;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.command.XCommand;
+import org.apache.oozie.executor.jpa.BulkJPAExecutor;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+
+/**
+ * The command to get a job info for a list of bundle jobs by given filters.
+ */
+public class BulkJobsXCommand extends XCommand<BulkResponseInfo> {
+    private Map<String,List<String>> bulkParams;
+    private int start = 1;
+    private int len = 50;
+
+    /**
+     * The constructor for BundleJobsXCommand
+     *
+     * @param filter the filter string
+     * @param start start location for paging
+     * @param len total length to get
+     */
+    public BulkJobsXCommand(Map<String,List<String>> filter, int start, int length) {
+        super("bundle.job.info", "bundle.job.info", 1);
+        this.bulkParams = filter;
+        this.start = start;
+        this.len = length;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#isLockRequired()
+     */
+    @Override
+    protected boolean isLockRequired() {
+        return false;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#getEntityKey()
+     */
+    @Override
+    public String getEntityKey() {
+        return null;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#loadState()
+     */
+    @Override
+    protected void loadState() throws CommandException {
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+     */
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#execute()
+     */
+    @Override
+    protected BulkResponseInfo execute() throws CommandException {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            BulkResponseInfo bulk = null;
+            if (jpaService != null) {
+                bulk = jpaService.execute(new BulkJPAExecutor(bulkParams, start, len));
+            }
+            else {
+                LOG.error(ErrorCode.E0610);
+            }
+            return bulk;
+        }
+        catch (XException ex) {
+            throw new CommandException(ex);
+        }
+    }
+
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/BulkJPAExecutor.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.rest.BulkResponseImpl;
+import org.apache.oozie.BulkResponseInfo;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * Load the coordinators for specified bundle in the Coordinator job bean
+ */
+public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> {
+    private Map<String, List<String>> bulkFilter;
+    // defaults
+    private int start = 1;
+    private int len = 50;
+
+    public static final String bundleQuery = "SELECT b.id, b.status FROM BundleJobBean b WHERE b.appName = :appName";
+    // Join query
+    public static final String actionQuery = "SELECT a.id, a.actionNumber, a.errorCode, a.errorMessage, a.externalId, " +
+            "a.externalStatus, a.status, a.createdTimestamp, a.nominalTimestamp, a.missingDependencies, " +
+            "c.id, c.appName, c.status FROM CoordinatorActionBean a, CoordinatorJobBean c " +
+            "WHERE a.jobId = c.id AND c.bundleId = :bundleId ORDER BY a.jobId, a.createdTimestamp";
+    public static final String countQuery = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c ";
+
+    public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) {
+        ParamChecker.notNull(bulkFilter, "bulkFilter");
+        this.bulkFilter = bulkFilter;
+        this.start = start;
+        this.len = len;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+     */
+    @Override
+    public String getName() {
+        return "BulkJPAExecutor";
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.EntityManager)
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException {
+        BundleJobBean bundleBean = new BundleJobBean();
+
+        List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>();
+
+        try {
+
+            // Lightweight Query 1 on Bundle level to fetch the bundle job params corresponding to name
+            String bundleName = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE_NAME).get(0);
+            Query q = em.createQuery(bundleQuery);
+            q.setParameter("appName", bundleName);
+            List<Object[]> bundles = (List<Object[]>) q.getResultList();
+            if(bundles.isEmpty()) {
+                throw new JPAExecutorException(ErrorCode.E0603, "No bundle entries found for bundle name: " + bundleName);
+            }
+            if(bundles.size() > 1) { // more than one bundles running with same name - ERROR. Fail fast
+                throw new JPAExecutorException(ErrorCode.E0603, "Non-unique bundles present for same bundle name: " + bundleName);
+            }
+            bundleBean = getBeanForBundleJob(bundles.get(0), bundleName);
+
+            //Join query between coordinator job and coordinator action tables to get entries for specific bundleId only
+            StringBuilder conditionClause = new StringBuilder();
+            conditionClause.append(coordNamesClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD_NAME)));
+            conditionClause.append(statusClause(bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS)));
+            conditionClause.append(timesClause());
+            StringBuilder getActions = new StringBuilder(actionQuery);
+            int offset = getActions.indexOf("ORDER");
+            getActions.insert(offset-1, conditionClause);
+            q = em.createQuery(getActions.toString());
+            q.setParameter("bundleId", bundleBean.getId());
+            // pagination
+            q.setFirstResult(start - 1);
+            q.setMaxResults(len);
+
+            List<Object[]> response = q.getResultList();
+            for (Object[] r : response) {
+                BulkResponseImpl br = getResponseFromObject(bundleBean, r);
+                responseList.add(br);
+            }
+
+            StringBuilder getTotal = new StringBuilder(countQuery);
+            String query = q.toString();
+            getTotal.append(query.substring(query.indexOf("WHERE"), query.indexOf("ORDER")));
+            Query qTotal = em.createQuery(getTotal.toString());
+            qTotal.setParameter("bundleId", bundleBean.getId());
+            long total = ((Long) qTotal.getSingleResult()).longValue();
+
+            BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len , total);
+            return bulk;
+        }
+        catch (Exception e) {
+            throw new JPAExecutorException(ErrorCode.E0603, e);
+        }
+    }
+
+    // Form the where clause to filter by coordinator names
+    private StringBuilder coordNamesClause(List<String> coordNames) {
+        StringBuilder sb = new StringBuilder();
+        boolean firstVal = true;
+        for (String name : nullToEmpty(coordNames)) {
+            if(firstVal) {
+                sb.append(" AND c.appName IN (\'" + name + "\'");
+                firstVal = false;
+            } else {
+                sb.append(",\'" + name + "\'");
+            }
+        }
+        if(!firstVal) {
+            sb.append(") ");
+        }
+        return sb;
+    }
+
+    // Form the where clause to filter by coord action status
+    private StringBuilder statusClause(List<String> statuses) {
+        StringBuilder sb = new StringBuilder();
+        boolean firstVal = true;
+        for (String status : nullToEmpty(statuses)) {
+            if(firstVal) {
+                sb.append(" AND a.status IN (\'" + status + "\'");
+                firstVal = false;
+            } else {
+                sb.append(",\'" + status + "\'");
+            }
+        }
+        if(!firstVal) {
+            sb.append(") ");
+        } else { // statuses was null. adding default
+            sb.append(" AND a.status IN ('KILLED', 'FAILED') ");
+        }
+        return sb;
+    }
+
+    private StringBuilder timesClause() throws ParseException {
+        StringBuilder sb = new StringBuilder();
+        List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH);
+        if(times != null) {
+            sb.append(" AND a.createdTimestamp >= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+        }
+        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH);
+        if(times != null) {
+            sb.append(" AND a.createdTimestamp <= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+        }
+        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH);
+        if(times != null) {
+            sb.append(" AND a.nominalTimestamp >= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+        }
+        times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH);
+        if(times != null) {
+            sb.append(" AND a.nominalTimestamp <= '" + new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()) + "'");
+        }
+        return sb;
+    }
+
+    private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) {
+        BulkResponseImpl bean = new BulkResponseImpl();
+        CoordinatorJobBean coordBean = new CoordinatorJobBean();
+        CoordinatorActionBean actionBean = new CoordinatorActionBean();
+        if (arr[0] != null) {
+            actionBean.setId((String) arr[0]);
+        }
+        if (arr[1] != null) {
+            actionBean.setActionNumber((Integer) arr[1]);
+        }
+        if (arr[2] != null) {
+            actionBean.setErrorCode((String) arr[2]);
+        }
+        if (arr[3] != null) {
+            actionBean.setErrorMessage((String) arr[3]);
+        }
+        if (arr[4] != null) {
+            actionBean.setExternalId((String) arr[4]);
+        }
+        if (arr[5] != null) {
+            actionBean.setExternalStatus((String) arr[5]);
+        }
+        if (arr[6] != null) {
+            actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6]));
+        }
+        if (arr[7] != null) {
+            actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7]));
+        }
+        if (arr[8] != null) {
+            actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8]));
+        }
+        if (arr[9] != null) {
+            actionBean.setMissingDependencies((String) arr[9]);
+        }
+        if (arr[10] != null) {
+            coordBean.setId((String) arr[10]);
+            actionBean.setJobId((String) arr[10]);
+        }
+        if (arr[11] != null) {
+            coordBean.setAppName((String) arr[11]);
+        }
+        if (arr[12] != null) {
+            coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12]));
+        }
+        bean.setBundle(bundleBean);
+        bean.setCoordinator(coordBean);
+        bean.setAction(actionBean);
+        return bean;
+    }
+
+    private BundleJobBean getBeanForBundleJob(Object[] barr, String name) throws Exception {
+        BundleJobBean bean = new BundleJobBean();
+        if (barr[0] != null) {
+            bean.setId((String) barr[0]);
+        } else {
+            throw new JPAExecutorException(ErrorCode.E0603, "bundleId returned by query is null - cannot retrieve bulk results");
+        }
+        bean.setAppName(name);
+        if (barr[1] != null) {
+            bean.setStatus(BundleJob.Status.valueOf((String) barr[1]));
+        }
+        return bean;
+    }
+
+    // null safeguard
+    public static List<String> nullToEmpty(List<String> input) {
+        return input == null ? Collections.<String>emptyList() : input;
+    }
+
+}

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/BaseJobsServlet.java Wed Aug 22 21:32:58 2012
@@ -52,7 +52,8 @@ public abstract class BaseJobsServlet ex
                                                   String.class, false, Arrays.asList("GET")),
                 new JsonRestServlet.ParameterInfo(RestConstants.LEN_PARAM,
                                                   String.class, false, Arrays.asList("GET")),
-
+                new JsonRestServlet.ParameterInfo(RestConstants.JOBS_BULK_PARAM,
+                                                  String.class, false, Arrays.asList("GET")),
                 new JsonRestServlet.ParameterInfo(
                         RestConstants.JOBS_EXTERNAL_ID_PARAM, String.class,
                         false, Arrays.asList("GET"))));

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/servlet/V1JobsServlet.java Wed Aug 22 21:32:58 2012
@@ -24,6 +24,8 @@ import javax.servlet.http.HttpServletReq
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.BaseEngineException;
+import org.apache.oozie.BulkResponseInfo;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.BundleJobInfo;
 import org.apache.oozie.CoordinatorEngine;
@@ -38,6 +40,7 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.WorkflowsInfo;
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.rest.BulkResponseImpl;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.service.CoordinatorEngineService;
@@ -127,17 +130,22 @@ public class V1JobsServlet extends BaseJ
     @Override
     protected JSONObject getJobs(HttpServletRequest request) throws XServletException, IOException {
         JSONObject json = null;
-        String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
-        jobtype = (jobtype != null) ? jobtype : "wf";
+        String isBulk = request.getParameter(RestConstants.JOBS_BULK_PARAM);
+        if(isBulk != null) {
+            json = getBulkJobs(request);
+        } else {
+            String jobtype = request.getParameter(RestConstants.JOBTYPE_PARAM);
+            jobtype = (jobtype != null) ? jobtype : "wf";
 
-        if (jobtype.contains("wf")) {
-            json = getWorkflowJobs(request);
-        }
-        else if (jobtype.contains("coord")) {
-            json = getCoordinatorJobs(request);
-        }
-        else if (jobtype.contains("bundle")) {
-            json = getBundleJobs(request);
+            if (jobtype.contains("wf")) {
+                json = getWorkflowJobs(request);
+            }
+            else if (jobtype.contains("coord")) {
+                json = getCoordinatorJobs(request);
+            }
+            else if (jobtype.contains("bundle")) {
+                json = getBundleJobs(request);
+            }
         }
         return json;
     }
@@ -371,6 +379,37 @@ public class V1JobsServlet extends BaseJ
         return json;
     }
 
+    @SuppressWarnings("unchecked")
+    private JSONObject getBulkJobs(HttpServletRequest request) throws XServletException, IOException {
+        JSONObject json = new JSONObject();
+        try {
+            String bulkFilter = request.getParameter(RestConstants.JOBS_BULK_PARAM); //REST API
+            String startStr = request.getParameter(RestConstants.OFFSET_PARAM);
+            String lenStr = request.getParameter(RestConstants.LEN_PARAM);
+            String timeZoneId = request.getParameter(RestConstants.TIME_ZONE_PARAM) == null
+                    ? "GMT" : request.getParameter(RestConstants.TIME_ZONE_PARAM);
+            int start = (startStr != null) ? Integer.parseInt(startStr) : 1;
+            start = (start < 1) ? 1 : start;
+            int len = (lenStr != null) ? Integer.parseInt(lenStr) : 50;
+            len = (len < 1) ? 50 : len;
+
+            BundleEngine bundleEngine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request),
+                    getAuthToken(request));
+            BulkResponseInfo bulkResponse = bundleEngine.getBulkJobs(bulkFilter, start, len);
+            List<BulkResponseImpl> jsonResponse = bulkResponse.getResponses();
+
+            json.put(JsonTags.BULK_RESPONSES, BulkResponseImpl.toJSONArray(jsonResponse, timeZoneId));
+            json.put(JsonTags.BULK_RESPONSE_TOTAL, bulkResponse.getTotal());
+            json.put(JsonTags.BULK_RESPONSE_OFFSET, bulkResponse.getStart());
+            json.put(JsonTags.BULK_RESPONSE_LEN, bulkResponse.getLen());
+
+        }
+        catch (BaseEngineException ex) {
+            throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex);
+        }
+        return json;
+    }
+
     /**
      * service implementation to submit a http job
      */

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/util/DateUtils.java Wed Aug 22 21:32:58 2012
@@ -38,7 +38,7 @@ public class DateUtils {
 
     public static final TimeZone UTC = getTimeZone("UTC");
 
-    private static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'";
+    public static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'";
     private static final String ISO8601_TZ_MASK_WITHOUT_OFFSET = "yyyy-MM-dd'T'HH:mm";
 
     private static String ACTIVE_MASK = ISO8601_UTC_MASK;

Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkMonitorJPAExecutor.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.oozie.BulkResponseInfo;
+import org.apache.oozie.BundleEngine;
+import org.apache.oozie.BundleEngineException;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.rest.BulkResponseImpl;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+
+public class TestBulkMonitorJPAExecutor extends XDataTestCase {
+    Services services;
+    JPAService jpaService;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+        LocalOozie.start();
+        jpaService = Services.get().get(JPAService.class);
+        addRecordsForBulkMonitor();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        LocalOozie.stop();
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testSingleRecord() throws Exception {
+
+        String request = "bundle=" + bundleName + ";actionstatus=FAILED;"
+                + "startcreatedtime=2012-07-21T00:00Z;endcreatedtime=2012-07-22T02:00Z";
+
+        List<BulkResponseImpl> brList = _execQuery(request);
+        assertEquals(1, brList.size()); // only 1 action satisfies the
+                                        // conditions
+        BulkResponseImpl br = brList.get(0);
+        assertEquals(bundleName, br.getBundle().getAppName());
+        assertEquals("Coord1", br.getCoordinator().getAppName());
+        assertEquals(CoordinatorAction.Status.FAILED, br.getAction().getStatus());
+        assertEquals(DateUtils.parseDateUTC(CREATE_TIME).toString(), br.getAction().getCreatedTime().toString());
+    }
+
+    public void testMultipleRecords() throws Exception {
+
+        String request = "bundle=" + bundleName + ";actionstatus=FAILED,KILLED;"
+                + "startcreatedtime=2012-07-21T00:00Z;endcreatedtime=2012-07-22T02:00Z";
+
+        List<BulkResponseImpl> brList = _execQuery(request);
+        assertEquals(3, brList.size()); // 3 actions satisfy the conditions
+        List<String> possibleStatus = new ArrayList<String>(Arrays.asList("KILLED", "FAILED"));
+        List<String> resultStatus = new ArrayList<String>();
+        resultStatus.add(brList.get(0).getAction().getStatus().toString());
+        resultStatus.add(brList.get(1).getAction().getStatus().toString());
+        assertEquals(possibleStatus, resultStatus);
+    }
+
+    public void testJavaNoRecords() throws Exception {
+
+        String request = "bundle=BUNDLE-ABC;actionstatus=FAILED";
+
+        BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
+        try {
+            jpaService.execute(bulkjpa);
+            fail(); // exception expected due to no records found for this
+                    // bundle
+        }
+        catch (JPAExecutorException jex) {
+            assertTrue(jex.getMessage().contains("No bundle entries found"));
+        }
+    }
+
+    public void testMultipleCoordinators() throws Exception {
+        // there are 3 coordinators but giving range as only two of them
+        String request = "bundle=" + bundleName + ";coordinators=Coord1,Coord2;actionstatus=KILLED";
+        List<BulkResponseImpl> brList = _execQuery(request);
+        assertEquals(2, brList.size()); // 2 actions satisfy the conditions
+        assertEquals(brList.get(0).getAction().getId(), "Coord1@2");
+        assertEquals(brList.get(1).getAction().getId(), "Coord2@1");
+    }
+
+    public void testDefaultStatus() throws Exception {
+        // adding coordinator action #4 to Coord#3
+        addRecordToCoordActionTable("Coord3", 1, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0);
+
+        String request = "bundle=" + bundleName + ";";
+        List<BulkResponseImpl> brList = _execQuery(request);
+        assertEquals(4, brList.size()); // 4 actions satisfy the conditions
+        List<String> possibleStatus = new ArrayList<String>(Arrays.asList("FAILED", "KILLED"));
+        List<String> resultStatus = new ArrayList<String>();
+        resultStatus.add(brList.get(0).getAction().getStatus().toString());
+        resultStatus.add(brList.get(1).getAction().getStatus().toString());
+        assertEquals(possibleStatus, resultStatus);
+    }
+
+    public void testMultipleBundleIdsForName() throws Exception {
+        // Adding another bundle having same name
+        BundleJobBean bundle = new BundleJobBean();
+        bundle.setId("00002-12345-B");
+        bundle.setAppName(bundleName);
+        bundle.setStatus(BundleJob.Status.RUNNING);
+        bundle.setStartTime(new Date());
+        BundleJobInsertJPAExecutor bundleInsert = new BundleJobInsertJPAExecutor(bundle);
+
+        jpaService.execute(bundleInsert);
+        String request = "bundle=" + bundleName;
+        BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
+        try {
+            jpaService.execute(bulkjpa);
+            fail(); // exception expected due to >1 records found for same
+                    // bundle name
+        }
+        catch (JPAExecutorException jex) {
+            assertTrue(jex.getMessage().contains("Non-unique bundles present for same bundle name"));
+        }
+    }
+
+    private List<BulkResponseImpl> _execQuery(String request) throws JPAExecutorException, BundleEngineException {
+        BulkJPAExecutor bulkjpa = new BulkJPAExecutor(BundleEngine.parseBulkFilter(request), 1, 10);
+        BulkResponseInfo response = jpaService.execute(bulkjpa);
+        assertNotNull(response);
+        return response.getResponses();
+    }
+
+}
\ No newline at end of file

Added: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java?rev=1376264&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java (added)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/servlet/TestBulkMonitorWebServiceAPI.java Wed Aug 22 21:32:58 2012
@@ -0,0 +1,312 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.servlet;
+
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.BundleJobBean;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.rest.JsonTags;
+import org.apache.oozie.client.rest.RestConstants;
+import org.apache.oozie.service.AuthorizationService;
+import org.apache.oozie.service.BundleEngineService;
+import org.apache.oozie.service.ForTestAuthorizationService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.ProxyUserService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.servlet.AuthFilter;
+import org.apache.oozie.servlet.HostnameFilter;
+import org.apache.oozie.servlet.V1JobsServlet;
+import org.apache.oozie.test.EmbeddedServletContainer;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.JSONValue;
+
+public class TestBulkMonitorWebServiceAPI extends XDataTestCase {
+    private EmbeddedServletContainer container;
+    private String servletPath;
+    Services services;
+    JPAService jpaService;
+
+    static {
+        new V1JobsServlet();
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+        cleanUpDBTables();
+        LocalOozie.start();
+        jpaService = Services.get().get(JPAService.class);
+        addRecordsForBulkMonitor();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        LocalOozie.stop();
+        services.destroy();
+        super.tearDown();
+    }
+
+    private URL createURL(String servletPath, String resource, Map<String, String> parameters) throws Exception {
+        StringBuilder sb = new StringBuilder();
+        sb.append(container.getServletURL(servletPath));
+        if (resource != null && resource.length() > 0) {
+            sb.append("/").append(resource);
+        }
+        if (parameters.size() > 0) {
+            String separator = "?";
+            for (Map.Entry<String, String> param : parameters.entrySet()) {
+                sb.append(separator).append(URLEncoder.encode(param.getKey(), "UTF-8")).append("=")
+                        .append(URLEncoder.encode(param.getValue(), "UTF-8"));
+                separator = "&";
+            }
+        }
+        return new URL(sb.toString());
+    }
+
+    private URL createURL(String resource, Map<String, String> parameters) throws Exception {
+        return createURL(servletPath, resource, parameters);
+    }
+
+    private void runTest(String servletPath, Class servletClass, boolean securityEnabled, Callable<Void> assertions)
+            throws Exception {
+        runTest(new String[] { servletPath }, new Class[] { servletClass }, securityEnabled, assertions);
+    }
+
+    private void runTest(String[] servletPath, Class[] servletClass, boolean securityEnabled, Callable<Void> assertions)
+            throws Exception {
+        Services services = new Services();
+        this.servletPath = servletPath[0];
+        try {
+            String proxyUser = getTestUser();
+            services.getConf().set(ProxyUserService.CONF_PREFIX + proxyUser + ProxyUserService.HOSTS, "*");
+            services.getConf().set(ProxyUserService.CONF_PREFIX + proxyUser + ProxyUserService.GROUPS, "*");
+            services.init();
+            services.getConf().setBoolean(AuthorizationService.CONF_SECURITY_ENABLED, securityEnabled);
+            Services.get().setService(ForTestAuthorizationService.class);
+            Services.get().setService(BundleEngineService.class);
+            container = new EmbeddedServletContainer("oozie");
+            for (int i = 0; i < servletPath.length; i++) {
+                container.addServletEndpoint(servletPath[i], servletClass[i]);
+            }
+            container.addFilter("*", HostnameFilter.class);
+            container.addFilter("*", AuthFilter.class);
+            setSystemProperty("user.name", getTestUser());
+            container.start();
+            assertions.call();
+        }
+        finally {
+            this.servletPath = null;
+            if (container != null) {
+                container.stop();
+            }
+            services.destroy();
+            container = null;
+        }
+    }
+
+    public void testSingleRecord() throws Exception {
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+
+                String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1;"
+                        + "actionStatus=FAILED;startcreatedtime=2012-07-21T00:00Z";
+                JSONArray array = _requestToServer(bulkRequest);
+
+                assertEquals(1, array.size());
+                JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+                JSONObject jcoord = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR);
+                JSONObject jaction = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+                assertNotNull(jbundle);
+                assertNotNull(jcoord);
+                assertNotNull(jaction);
+
+                assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), "BUNDLE-TEST");
+                assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), "Coord1");
+                assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), "RUNNING");
+                assertEquals(jaction.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED");
+                assertEquals((jaction.get(JsonTags.COORDINATOR_ACTION_CREATED_TIME).toString().split(", "))[1],
+                        DateUtils.parseDateUTC(CREATE_TIME).toGMTString());
+                return null;
+            }
+        });
+    }
+
+    public void testMultipleRecords() throws Exception {
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+
+                String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1;"
+                        + "actionStatus=FAILED,KILLED;startcreatedtime=2012-07-21T00:00Z";
+                JSONArray array = _requestToServer(bulkRequest);
+
+                assertEquals(2, array.size());
+                JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+                assertNotNull(jbundle);
+                JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+                JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+                assertNotNull(jaction1);
+                assertNotNull(jaction2);
+
+                assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), bundleName);
+                assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED");
+                assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_STATUS), "KILLED");
+
+                return null;
+            }
+        });
+    }
+
+    public void testNoRecords() throws Exception {
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+                String bulkRequest = "bundle=BUNDLE-ABC";
+                Map<String, String> params = new HashMap<String, String>();
+                params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
+                params.put(RestConstants.OFFSET_PARAM, "1");
+                params.put(RestConstants.LEN_PARAM, "5");
+                URL url = createURL("", params);
+                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+                // WS call will throw BAD_REQUEST code 400 error because no
+                // records found for this bundle
+                assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+                return null;
+            }
+        });
+    }
+
+    public void testMultipleCoordinators() throws Exception {
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+                // giving range as 2 of the total 3 coordinators
+                String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1,Coord2;actionstatus=KILLED";
+                JSONArray array = _requestToServer(bulkRequest);
+
+                assertEquals(2, array.size());
+                JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
+                assertNotNull(jbundle);
+                JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
+                JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
+
+                assertNotNull(jaction1);
+                assertNotNull(jaction2);
+
+                assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_ID), "Coord1@2");
+                assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_ID), "Coord2@1");
+
+                return null;
+            }
+        });
+    }
+
+    public void testDefaultStatus() throws Exception {
+
+        // adding coordinator action #4 to Coord#3
+        CoordinatorActionBean action4 = new CoordinatorActionBean();
+        action4.setId("Coord3@1");
+        action4.setStatus(CoordinatorAction.Status.FAILED);
+        action4.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+        action4.setJobId("Coord3");
+
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+        cal.add(Calendar.DATE, -1);
+
+        action4.setNominalTime(cal.getTime());
+        CoordActionInsertJPAExecutor actionInsert = new CoordActionInsertJPAExecutor(action4);
+        jpaService.execute(actionInsert);
+
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+                String bulkRequest = "bundle=" + bundleName;
+                JSONArray array = _requestToServer(bulkRequest);
+
+                assertEquals(4, array.size());
+
+                return null;
+            }
+        });
+    }
+
+    public void testMultipleBundleIdsForName() throws Exception {
+
+        // Adding another bundle having same name
+        BundleJobBean bundle = new BundleJobBean();
+        bundle.setId("00002-12345-B");
+        bundle.setAppName(bundleName);
+        bundle.setStatus(BundleJob.Status.RUNNING);
+        bundle.setStartTime(new Date());
+        BundleJobInsertJPAExecutor bundleInsert = new BundleJobInsertJPAExecutor(bundle);
+        jpaService.execute(bundleInsert);
+
+        runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
+            public Void call() throws Exception {
+                String bulkRequest = "bundle=" + bundleName;
+                Map<String, String> params = new HashMap<String, String>();
+                params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
+                params.put(RestConstants.OFFSET_PARAM, "1");
+                params.put(RestConstants.LEN_PARAM, "5");
+                URL url = createURL("", params);
+                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+                // WS call will throw BAD_REQUEST code 400 error because no
+                // records found for this bundle
+                assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
+
+                return null;
+            }
+        });
+    }
+
+    private JSONArray _requestToServer(String bulkRequest) throws Exception {
+        Map<String, String> params = new HashMap<String, String>();
+        params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
+        params.put(RestConstants.OFFSET_PARAM, "1");
+        params.put(RestConstants.LEN_PARAM, "5");
+        URL url = createURL("", params);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setRequestMethod("GET");
+        assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
+        assertTrue(conn.getHeaderField("content-type").startsWith(RestConstants.JSON_CONTENT_TYPE));
+        JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream()));
+        JSONArray array = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
+        return array;
+    }
+
+}
\ No newline at end of file

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Wed Aug 22 21:32:58 2012
@@ -25,6 +25,7 @@ import java.io.OutputStreamWriter;
 import java.io.Reader;
 import java.io.UnsupportedEncodingException;
 import java.io.Writer;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.regex.Matcher;
 
@@ -39,6 +40,7 @@ import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
+import org.apache.oozie.client.BundleJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.Job;
@@ -89,6 +91,9 @@ public abstract class XDataTestCase exte
             + " <sla:qa-contact>abc@yahoo.com</sla:qa-contact>" + " <sla:se-contact>abc@yahoo.com</sla:se-contact>"
             + "</sla:info>";
 
+    protected String bundleName;
+    protected String CREATE_TIME = "2012-07-22T00:00Z";
+
     /**
      * Insert coord job for testing.
      *
@@ -1128,4 +1133,64 @@ public abstract class XDataTestCase exte
         }
     }
 
+    /**
+     * Adds the db records for the Bulk Monitor tests
+     */
+    protected void addRecordsForBulkMonitor() throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        // adding the bundle job
+        BundleJobBean bundle = addRecordToBundleJobTable(BundleJob.Status.RUNNING, false);
+        String bundleId = bundle.getId();
+        bundleName = bundle.getAppName();
+
+        // adding coordinator job(s) for this bundle
+        addRecordToCoordJobTableWithBundle(bundleId, "Coord1", CoordinatorJob.Status.RUNNING, true, true, 2);
+        addRecordToCoordJobTableWithBundle(bundleId, "Coord2", CoordinatorJob.Status.RUNNING, true, true, 1);
+        addRecordToCoordJobTableWithBundle(bundleId, "Coord3", CoordinatorJob.Status.RUNNING, true, true, 1);
+
+        // adding coordinator action #1 to Coord#1
+        CoordinatorActionBean action1 = new CoordinatorActionBean();
+        action1.setId("Coord1@1");
+        action1.setStatus(CoordinatorAction.Status.FAILED);
+        action1.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+        action1.setJobId("Coord1");
+
+        Calendar cal = Calendar.getInstance();
+        cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+        cal.add(Calendar.DATE, -1);
+
+        action1.setNominalTime(cal.getTime());
+        CoordActionInsertJPAExecutor actionInsert = new CoordActionInsertJPAExecutor(action1);
+        jpaService.execute(actionInsert);
+
+        // adding coordinator action #2 to Coord#1
+        CoordinatorActionBean action2 = new CoordinatorActionBean();
+        action2.setId("Coord1@2");
+        action2.setStatus(CoordinatorAction.Status.KILLED);
+        action2.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+        action2.setJobId("Coord1");
+
+        cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+        cal.add(Calendar.DATE, -1);
+
+        action2.setNominalTime(cal.getTime());
+        actionInsert = new CoordActionInsertJPAExecutor(action2);
+        jpaService.execute(actionInsert);
+
+        // adding coordinator action #3 to Coord#2
+        CoordinatorActionBean action3 = new CoordinatorActionBean();
+        action3.setId("Coord2@1");
+        action3.setStatus(CoordinatorAction.Status.KILLED);
+        action3.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
+        action3.setJobId("Coord2");
+
+        cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
+        cal.add(Calendar.DATE, -1);
+
+        action3.setNominalTime(cal.getTime());
+        actionInsert = new CoordActionInsertJPAExecutor(action3);
+        jpaService.execute(actionInsert);
+    }
+
 }

Modified: incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki (original)
+++ incubator/oozie/trunk/docs/src/site/twiki/WebServicesAPI.twiki Wed Aug 22 21:32:58 2012
@@ -786,6 +786,110 @@ Additionally the =start= and =len= param
 Moreover, the =jobtype= parameter could be used to determine what type of job is looking for.
 The valid values of job type are: =workflow=, =coordinator= or =bundle=.
 
+---++++ Jobs information using Bulk API
+
+A HTTP GET request retrieves a bulk response for all actions, corresponding to a particular bundle, that satisfy user specified criteria.
+This is useful for monitoring purposes, where user can find out about the status of downstream jobs with a single bulk request.
+The criteria are used for filtering the actions returned. Valid options (case insensitive) for these request criteria are:
+
+   * bundle: the application name from the bundle definition
+   * coordinators: the application name(s) from the coordinator definition.
+   * actionStatus: the status of coordinator action (Valid values are WAITING, READY, SUBMITTED, RUNNING, SUSPENDED, TIMEDOUT, SUCCEEDED, KILLED, FAILED)
+   * startCreatedTime: the start of the window you want to look at, of the actions' created time
+   * endCreatedTime: the end of above window
+   * startScheduledTime: the start of the window you want to look at, of the actions' scheduled i.e. nominal time.
+   * endScheduledTime: the end of above window
+
+Specifying 'bundle' is REQUIRED. All the rest are OPTIONAL but that might result in thousands of results depending on the size of your job. (pagination comes into play then)
+
+The query will do an AND among all the filter names.
+
+The syntax for the request criteria is <verbatim>[NAME=VALUE][;NAME=VALUE]*</verbatim>
+
+For 'coordinators' and 'actionStatus', if user wants to check for multiple values, they can be passed in a comma-separated manner. *Note*: The query will do an OR among them. Hence no need to repeat the criteria name
+
+All the time parameter values should be specified in ISO8601 format i.e. yyyy-MM-dd'T'HH:mm'Z'
+
+Additionally the =offset= and =len= parameters can be used as usual for pagination. The start parameter is base 1.
+
+*Request:*
+
+<verbatim>
+GET /oozie/v1/jobs?bulk=bundle%3Dmy-bundle-app;coordinators%3Dmy-coord-1,my-coord-5;actionStatus%3DKILLED&offset=1&len=50
+</verbatim>
+
+Note that the filter is URL encoded, its decoded value is <code>user=bansalm</code>. If typing in browser URL, one can type decoded value itself i.e. using '='
+
+*Response:*
+
+<verbatim>
+HTTP/1.1 200 OK
+Content-Type: application/json;charset=UTF-8
+.
+{
+  offset: 1,
+  len: 50,
+  total: 1002,
+**  bulkresponses: [
+**  {
+      bulkbundle:
+	  {
+	    bundleJobName: "my-bundle-app",
+        bundleJobId: "0-200905191240-oozie-B",
+        status: "SUSPENDED",
+      },
+      bulkcoord:
+      {
+        coordJobName: "my-coord-1",
+        status: "SUSPENDED",
+      },
+      bulkaction:
+      {
+        id: "0-200905191240-oozie-C@21",
+        coordJobId: "0-200905191240-oozie-C",
+        actionNumber: 21,
+        externalId: "job_00076_0009",
+        status: "KILLED",
+        externalStatus: "FAILED",
+        errorCode: "E0902",
+        errorMessage: "Input file corrupt",
+        createdTime: "Fri, 02 Jan 2009 00:00:00 GMT",
+        nominalTime: "Thu, 01 Jan 2009 00:00:00 GMT",
+        missingDependencies: "hdfs://nn:port/user/joe/file.txt"
+      },
+    },
+**  {
+      bulkbundle:
+      {
+        bundleJobName: "my-bundle-app",
+        bundleJobId: "0-200905191240-oozie-B",
+        status: "SUSPENDED",
+      }
+      bulkcoord:
+      {
+        coordJobName: "my-coord-5",
+        status: "SUSPENDED",
+      }
+      bulkaction:
+      {
+        id: "0-200905191245-oozie-C@114",
+        coordJobId: "0-200905191245-oozie-C",
+        actionNumber: 114,
+        externalId: "job_00076_0265",
+        status: "KILLED",
+        externalStatus: "KILLED",
+        errorCode: "E0603",
+        errorMessage: "SQL error in operation ...",
+        createdTime: "Fri, 02 Jan 2009 00:00:00 GMT",
+        nominalTime: "Thu, 01 Jan 2009 00:00:00 GMT",
+        missingDependencies:
+      }
+    }
+    ...
+  ]
+}
+</verbatim>
+
 </noautolink>
 
 

Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1376264&r1=1376263&r2=1376264&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Aug 22 21:32:58 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.3.0 release (trunk - unreleased)
 
+OOZIE-848 Bulk Monitoring API - Consolidated view of jobs (mona via virag)
 OOZIE-934 Exception reporting during Services startup is inadequate (mona via virag)
 OOZIE-914 Make sure all commands do their JPA writes within a single JPA executor (mona via virag)
 OOZIE-918 Changing log level from DEBUG to TRACE for trivial log statements (mona via virag)