You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by mo...@apache.org on 2013/04/10 02:51:45 UTC
svn commit: r1466307 [2/2] - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/client/
client/src/main/java/org/apache/oozie/client/event/
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/client/rest/ core/src/main/java/...
Added: oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/event/listener/JobEventListener.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event.listener;
+
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+
+/**
+ * Event listener for Job notification events, defining methods corresponding to
+ * job status changes
+ */
+public abstract class JobEventListener {
+
+ /**
+ * Initialize the listener
+ */
+ public abstract void init();
+
+ /**
+ * Destroy the listener
+ */
+ public abstract void destroy();
+
+ /**
+ * On workflow job transition to start state
+ * @param WorkflowJobEvent
+ */
+ public abstract void onWorkflowJobStart(WorkflowJobEvent wje);
+
+ /**
+ * On workflow job transition to success state
+ * @param WorkflowJobEvent
+ */
+ public abstract void onWorkflowJobSuccess(WorkflowJobEvent wje);
+
+ /**
+ * On workflow job transition to failure state
+ * @param WorkflowJobEvent
+ */
+ public abstract void onWorkflowJobFailure(WorkflowJobEvent wje);
+
+ /**
+ * On workflow job transition to suspend state
+ * @param WorkflowJobEvent
+ */
+ public abstract void onWorkflowJobSuspend(WorkflowJobEvent wje);
+
+ /**
+ * On workflow action transition to start state
+ * @param WorkflowActionEvent
+ */
+ public abstract void onWorkflowActionStart(WorkflowActionEvent wae);
+
+ /**
+ * On workflow action transition to success state
+ * @param WorkflowActionEvent
+ */
+ public abstract void onWorkflowActionSuccess(WorkflowActionEvent wae);
+
+ /**
+ * On workflow action transition to failure state
+ * @param WorkflowActionEvent
+ */
+ public abstract void onWorkflowActionFailure(WorkflowActionEvent wae);
+
+ /**
+ * On workflow action transition to suspend state
+ * @param WorkflowActionEvent
+ */
+ public abstract void onWorkflowActionSuspend(WorkflowActionEvent wae);
+
+ /**
+ * On coord job transition to start state
+ * @param CoordinatorJobEvent
+ */
+ public abstract void onCoordinatorJobStart(CoordinatorJobEvent wje);
+
+ /**
+ * On coord job transition to success state
+ * @param CoordinatorJobEvent
+ */
+ public abstract void onCoordinatorJobSuccess(CoordinatorJobEvent wje);
+
+ /**
+ * On coord job transition to failure state
+ * @param CoordinatorJobEvent
+ */
+ public abstract void onCoordinatorJobFailure(CoordinatorJobEvent wje);
+
+ /**
+ * On coord job transition to suspend state
+ * @param CoordinatorJobEvent
+ */
+ public abstract void onCoordinatorJobSuspend(CoordinatorJobEvent wje);
+
+ /**
+ * On coord action transition to waiting state
+ * @param CoordinatorActionEvent
+ */
+ public abstract void onCoordinatorActionWaiting(CoordinatorActionEvent wae);
+
+ /**
+ * On coord action transition to start state
+ * @param CoordinatorActionEvent
+ */
+ public abstract void onCoordinatorActionStart(CoordinatorActionEvent wae);
+
+ /**
+ * On coord action transition to success state
+ * @param CoordinatorActionEvent
+ */
+ public abstract void onCoordinatorActionSuccess(CoordinatorActionEvent wae);
+
+ /**
+ * On coord action transition to failure state
+ * @param CoordinatorActionEvent
+ */
+ public abstract void onCoordinatorActionFailure(CoordinatorActionEvent wae);
+
+ /**
+ * On coord action transition to suspend state
+ * @param CoordinatorActionEvent
+ */
+ public abstract void onCoordinatorActionSuspend(CoordinatorActionEvent wae);
+
+ /**
+ * On bundle job transition to start state
+ * @param BundleJobEvent
+ */
+ public abstract void onBundleJobStart(BundleJobEvent wje);
+
+ /**
+ * On bundle job transition to success state
+ * @param BundleJobEvent
+ */
+ public abstract void onBundleJobSuccess(BundleJobEvent wje);
+
+ /**
+ * On bundle job transition to failure state
+ * @param BundleJobEvent
+ */
+ public abstract void onBundleJobFailure(BundleJobEvent wje);
+
+ /**
+ * On bundle job transition to suspend state
+ * @param BundleJobEvent
+ */
+ public abstract void onBundleJobSuspend(BundleJobEvent wje);
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForCheckJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -90,6 +90,12 @@ public class CoordActionGetForCheckJPAEx
if (arr[6] != null){
bean.setSlaXml((String) arr[6]);
}
+ if (arr[7] != null){
+ bean.setNominalTime(DateUtils.toDate((Timestamp) arr[7]));
+ }
+ if (arr[8] != null){
+ bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[8]));
+ }
return bean;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForExternalIdJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -94,6 +94,12 @@ public class CoordActionGetForExternalId
if (arr[6] != null){
bean.setSlaXml((String) arr[6]);
}
+ if (arr[7] != null){
+ bean.setNominalTime(DateUtils.toDate((Timestamp) arr[7]));
+ }
+ if (arr[8] != null){
+ bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[8]));
+ }
return bean;
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForStartJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -17,12 +17,15 @@
*/
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
/**
@@ -95,6 +98,12 @@ public class CoordActionGetForStartJPAEx
if (arr[9] != null) {
bean.setErrorCode((String) arr[9]);
}
+ if (arr[10] != null){
+ bean.setNominalTime(DateUtils.toDate((Timestamp) arr[10]));
+ }
+ if (arr[11] != null){
+ bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[11]));
+ }
return bean;
}
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionGetForTimeoutJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -17,12 +17,15 @@
*/
package org.apache.oozie.executor.jpa;
+import java.sql.Timestamp;
+
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ParamChecker;
/**
@@ -80,6 +83,12 @@ public class CoordActionGetForTimeoutJPA
if (arr[4] != null) {
bean.setPending((Integer) arr[4]);
}
+ if (arr[5] != null){
+ bean.setNominalTime(DateUtils.toDate((Timestamp) arr[5]));
+ }
+ if (arr[6] != null){
+ bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[6]));
+ }
return bean;
}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/executor/jpa/CoordinatorJobGetForUserAppnameJPAExecutor.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.executor.jpa;
+
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * DB query executor to fetch columns 'user' and 'appName' from Coordinator Job table
+ */
+public class CoordinatorJobGetForUserAppnameJPAExecutor implements JPAExecutor<CoordinatorJobBean> {
+
+ private String coordJobId = null;
+
+ public CoordinatorJobGetForUserAppnameJPAExecutor(String coordJobId) {
+ ParamChecker.notNull(coordJobId, "coordJobId");
+ this.coordJobId = coordJobId;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
+ */
+ public String getName() {
+ return "CoordJobGetForUserAppnameJPAExecutor";
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
+ * EntityManager)
+ */
+ @Override
+ public CoordinatorJobBean execute(EntityManager em) throws JPAExecutorException {
+ try {
+ Query q = em.createNamedQuery("GET_COORD_JOB_FOR_USER_APPNAME");
+ q.setParameter("id", coordJobId);
+ Object[] arr = (Object[]) q.getSingleResult();
+ CoordinatorJobBean bean = new CoordinatorJobBean();
+ if (arr[0] != null) {
+ bean.setUser((String) arr[0]);
+ }
+ if (arr[1] != null){
+ bean.setAppName((String) arr[1]);
+ }
+ return bean;
+ }
+ catch (Exception e) {
+ throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
+ }
+ }
+
+}
Added: oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/EventHandlerService.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.client.event.Event;
+import org.apache.oozie.client.event.Event.MessageType;
+import org.apache.oozie.client.event.JobEvent;
+import org.apache.oozie.event.EventQueue;
+import org.apache.oozie.event.MemoryEventQueue;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.event.listener.JobEventListener;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.client.event.SLAEvent;
+import org.apache.oozie.sla.event.listener.SLAEventListener;
+import org.apache.oozie.util.XLog;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Service class that handles the events system - creating events queue,
+ * managing configured properties and managing and invoking various event
+ * listeners via worker threads
+ */
+public class EventHandlerService implements Service {
+
+ public static final String CONF_PREFIX = Service.CONF_PREFIX + "EventHandlerService.";
+ public static final String CONF_QUEUE_SIZE = CONF_PREFIX + "queue.size";
+ public static final String CONF_EVENT_QUEUE = CONF_PREFIX + "event.queue";
+ public static final String CONF_LISTENERS = CONF_PREFIX + "event.listeners";
+ public static final String CONF_APP_TYPES = CONF_PREFIX + "app.types";
+ public static final String CONF_BATCH_SIZE = CONF_PREFIX + "batch.size";
+ public static final String CONF_WORKER_INTERVAL = CONF_PREFIX + "worker.interval";
+
+ private static EventQueue eventQueue;
+ private int queueMaxSize;
+ private XLog LOG;
+ private static Map<MessageType, List<?>> listenerMap = new HashMap<MessageType, List<?>>();
+ private Set<String> apptypes;
+ private static int batchSize;
+ private static boolean eventsConfigured = false;
+
+ @SuppressWarnings("unchecked")
+ public void init(Services services) throws ServiceException {
+ try {
+ eventsConfigured = true;
+ Configuration conf = services.getConf();
+ queueMaxSize = conf.getInt(CONF_QUEUE_SIZE, 10000);
+ LOG = XLog.getLog(getClass());
+ Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null);
+ eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance();
+ batchSize = conf.getInt(CONF_BATCH_SIZE, 10);
+ eventQueue.init(queueMaxSize, batchSize);
+ // initialize app-types to switch on events for
+ initApptypes(conf);
+ // initialize event listeners
+ initEventListeners(conf);
+ // initialize worker threads via Scheduler
+ initWorkerThreads(conf, services);
+ LOG.info(
+ "EventHandlerService initialized. Event queue = [{0}], Event listeners configured = [{1}], Events configured for App-types = [{3}]",
+ eventQueue.getClass().getName(), listenerMap.toString(), apptypes);
+ }
+ catch (Exception ex) {
+ throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
+ }
+ }
+
+ private void initApptypes(Configuration conf) {
+ apptypes = new HashSet<String>();
+ for (String jobtype : conf.getStringCollection(CONF_APP_TYPES)) {
+ String tmp = jobtype.trim().toLowerCase();
+ if (tmp.length() == 0) {
+ continue;
+ }
+ apptypes.add(tmp);
+ }
+ }
+
+ private void initEventListeners(Configuration conf) {
+ Class<?>[] listenerClass = (Class<?>[]) conf.getClasses(CONF_LISTENERS);
+ for (int i = 0; i < listenerClass.length; i++) {
+ Object listener = null;
+ try {
+ listener = listenerClass[i].newInstance();
+ }
+ catch (InstantiationException e) {
+ LOG.warn("Could not create event listener instance, " + e);
+ }
+ catch (IllegalAccessException e) {
+ LOG.warn("Illegal access to event listener instance, " + e);
+ }
+ addEventListener(listener);
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public void addEventListener(Object listener) {
+ if (listener instanceof JobEventListener) {
+ List listenersList = listenerMap.get(MessageType.JOB);
+ if (listenersList == null) {
+ listenersList = new ArrayList();
+ listenerMap.put(MessageType.JOB, (List<? extends JobEventListener>) listenersList);
+ }
+ listenersList.add(listener);
+ ((JobEventListener) listener).init();
+ }
+ else if (listener instanceof SLAEventListener) {
+ List listenersList = listenerMap.get(MessageType.SLA);
+ if (listenersList == null) {
+ listenersList = new ArrayList();
+ listenerMap.put(MessageType.SLA, (List<? extends SLAEventListener>) listenersList);
+ }
+ listenersList.add(listener);
+ ((SLAEventListener) listener).init();
+ }
+ else {
+ LOG.warn("Event listener [{0}] is of undefined type", listener.getClass().getCanonicalName());
+ }
+ }
+
+ public static boolean isEventsConfigured() {
+ return eventsConfigured;
+ }
+
+ private void initWorkerThreads(Configuration conf, Services services) {
+ Runnable eventWorker = new EventWorker();
+ // schedule runnable by default every 5 min
+ services.get(SchedulerService.class).schedule(eventWorker, 10, conf.getInt(CONF_WORKER_INTERVAL, 60),
+ SchedulerService.Unit.SEC);
+ }
+
+ @Override
+ public void destroy() {
+ for (MessageType type : listenerMap.keySet()) {
+ Iterator<?> iter = listenerMap.get(type).iterator();
+ while (iter.hasNext()) {
+ if (type == MessageType.JOB) {
+ ((JobEventListener) iter.next()).destroy();
+ }
+ else if (type == MessageType.SLA) {
+ ((SLAEventListener) iter.next()).destroy();
+ }
+ }
+ }
+ eventsConfigured = false;
+ }
+
+ @Override
+ public Class<? extends Service> getInterface() {
+ return EventHandlerService.class;
+ }
+
+ public boolean checkSupportedApptype(String appType) {
+ if (!apptypes.contains(appType.toLowerCase())) {
+ return false;
+ }
+ return true;
+ }
+
+ public void setAppTypes(Set<String> types) {
+ apptypes = types;
+ }
+
+ public Set<String> getAppTypes() {
+ return apptypes;
+ }
+
+ public void queueEvent(Event event) {
+ eventQueue.add(event);
+ }
+
+ public EventQueue getEventQueue() {
+ return eventQueue;
+ }
+
+ public class EventWorker implements Runnable {
+
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ if (!eventQueue.isEmpty()) {
+ Set<Event> work = eventQueue.pollBatch();
+ for (Event event : work) {
+ MessageType msgType = event.getMsgType();
+ List<?> listeners = listenerMap.get(msgType);
+ if (listeners != null) {
+ Iterator<?> iter = listeners.iterator();
+ while (iter.hasNext()) {
+ if (msgType == MessageType.JOB) {
+ invokeJobEventListener(iter, (JobEvent) event);
+ }
+ else if (msgType == MessageType.SLA) {
+ invokeSLAEventListener(iter, (SLAEvent) event);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void invokeJobEventListener(Iterator<?> iter, JobEvent event) {
+ JobEventListener el = (JobEventListener) iter.next();
+ switch (event.getAppType()) {
+ case WORKFLOW_JOB:
+ onWorkflowJobEvent(event, el);
+ break;
+ case WORKFLOW_ACTION:
+ onWorkflowActionEvent(event, el);
+ break;
+ case COORDINATOR_JOB:
+ onCoordinatorJobEvent(event, el);
+ break;
+ case COORDINATOR_ACTION:
+ onCoordinatorActionEvent(event, el);
+ break;
+ case BUNDLE_JOB:
+ onBundleJobEvent(event, el);
+ break;
+ default:
+ XLog.getLog(EventHandlerService.class).info("Undefined Job Event app-type - {0}",
+ event.getAppType());
+ }
+ }
+
+ private void invokeSLAEventListener(Iterator<?> iter, SLAEvent event) {
+ SLAEventListener sel = (SLAEventListener) iter.next();
+ switch (event.getEventType()) {
+ case START_MET:
+ sel.onStartMet((SLAEvent) event);
+ break;
+ case START_MISS:
+ sel.onStartMiss((SLAEvent) event);
+ break;
+ case END_MET:
+ sel.onEndMet((SLAEvent) event);
+ break;
+ case END_MISS:
+ sel.onEndMiss((SLAEvent) event);
+ break;
+ case DURATION_MET:
+ sel.onDurationMet((SLAEvent) event);
+ break;
+ case DURATION_MISS:
+ sel.onDurationMiss((SLAEvent) event);
+ break;
+ default:
+ XLog.getLog(EventHandlerService.class).info("Undefined SLA event type - {0}", event.getEventType());
+ }
+ }
+
+ private void onWorkflowJobEvent(JobEvent event, JobEventListener el) {
+ switch (event.getEventStatus()) {
+ case STARTED:
+ el.onWorkflowJobStart((WorkflowJobEvent) event);
+ break;
+ case SUCCESS:
+ el.onWorkflowJobSuccess((WorkflowJobEvent) event);
+ break;
+ case FAILURE:
+ el.onWorkflowJobFailure((WorkflowJobEvent) event);
+ break;
+ case SUSPEND:
+ el.onWorkflowJobSuspend((WorkflowJobEvent) event);
+ break;
+ default:
+ XLog.getLog(EventHandlerService.class).info("Undefined WF Job event-status - {0}",
+ event.getEventStatus());
+ }
+ }
+
+ private void onWorkflowActionEvent(JobEvent event, JobEventListener el) {
+ switch (event.getEventStatus()) {
+ case STARTED:
+ el.onWorkflowActionStart((WorkflowActionEvent) event);
+ break;
+ case SUCCESS:
+ el.onWorkflowActionSuccess((WorkflowActionEvent) event);
+ break;
+ case FAILURE:
+ el.onWorkflowActionFailure((WorkflowActionEvent) event);
+ break;
+ default:
+ XLog.getLog(EventHandlerService.class).info("Undefined WF action event-status - {0}",
+ event.getEventStatus());
+ }
+ }
+
+ private void onCoordinatorJobEvent(JobEvent event, JobEventListener el) {
+ switch (event.getEventStatus()) {
+ case STARTED:
+ el.onCoordinatorJobStart((CoordinatorJobEvent) event);
+ break;
+ case SUCCESS:
+ el.onCoordinatorJobSuccess((CoordinatorJobEvent) event);
+ break;
+ case FAILURE:
+ el.onCoordinatorJobFailure((CoordinatorJobEvent) event);
+ break;
+ case SUSPEND:
+ el.onCoordinatorJobSuspend((CoordinatorJobEvent) event);
+ break;
+ default:
+ XLog.getLog(EventHandlerService.class).info("Undefined Coord Job event-status - {0}",
+ event.getEventStatus());
+ }
+ }
+
+ private void onCoordinatorActionEvent(JobEvent event, JobEventListener el) {
+ switch (event.getEventStatus()) {
+ case WAITING:
+ el.onCoordinatorActionWaiting((CoordinatorActionEvent) event);
+ break;
+ case STARTED:
+ el.onCoordinatorActionStart((CoordinatorActionEvent) event);
+ break;
+ case SUCCESS:
+ el.onCoordinatorActionSuccess((CoordinatorActionEvent) event);
+ break;
+ case FAILURE:
+ el.onCoordinatorActionFailure((CoordinatorActionEvent) event);
+ break;
+ case SUSPEND:
+ el.onCoordinatorActionSuspend((CoordinatorActionEvent) event);
+ break;
+ default:
+ XLog.getLog(EventHandlerService.class).info("Undefined Coord action event-status - {0}",
+ event.getEventStatus());
+ }
+ }
+
+ private void onBundleJobEvent(JobEvent event, JobEventListener el) {
+ switch (event.getEventStatus()) {
+ case STARTED:
+ el.onBundleJobStart((BundleJobEvent) event);
+ break;
+ case SUCCESS:
+ el.onBundleJobSuccess((BundleJobEvent) event);
+ break;
+ case FAILURE:
+ el.onBundleJobFailure((BundleJobEvent) event);
+ break;
+ case SUSPEND:
+ el.onBundleJobSuspend((BundleJobEvent) event);
+ break;
+ default:
+ XLog.getLog(EventHandlerService.class).info("Undefined Bundle Job event-type - {0}",
+ event.getEventStatus());
+ }
+ }
+
+ }
+
+}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java Wed Apr 10 00:51:43 2013
@@ -256,7 +256,7 @@ public class RecoveryService implements
CoordinatorJobBean coordJob = jpaService
.execute(new CoordJobGetJPAExecutor(caction.getJobId()));
queueCallable(new CoordActionStartXCommand(caction.getId(), coordJob.getUser(),
- coordJob.getAuthToken(), caction.getJobId()));
+ coordJob.getAppName(), coordJob.getAuthToken(), caction.getJobId()));
log.info("Recover a SUBMITTED coord action and resubmit CoordActionStartCommand :"
+ caction.getId());
Added: oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java (added)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/sla/event/listener/SLAEventListener.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.sla.event.listener;
+
+import org.apache.oozie.client.event.SLAEvent;
+
+/**
+ * Event listener for SLA related events, defining methods corresponding to
+ * SLA mets/misses
+ */
+public abstract class SLAEventListener {
+
+ /**
+ * Initialize the listener
+ */
+ public abstract void init();
+
+ /**
+ * Destroy the listener
+ */
+ public abstract void destroy();
+
+ /**
+ * on SLA job start-time limit met
+ * @param SLAEvent
+ */
+ public abstract void onStartMet(SLAEvent work);
+
+ /**
+ * on SLA job start-time limit missed
+ * @param SLAEvent
+ */
+ public abstract void onStartMiss(SLAEvent event);
+
+ /**
+ * on SLA job end-time limit met
+ * @param SLAEvent
+ */
+ public abstract void onEndMet(SLAEvent work);
+
+ /**
+ * on SLA job end-time limit missed
+ * @param SLAEvent
+ */
+ public abstract void onEndMiss(SLAEvent event);
+
+ /**
+ * on SLA job duration limit met
+ * @param SLAEvent
+ */
+ public abstract void onDurationMet(SLAEvent work);
+
+ /**
+ * on SLA job duration limit missed
+ * @param SLAEvent
+ */
+ public abstract void onDurationMiss(SLAEvent event);
+
+}
Modified: oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ oozie/trunk/core/src/main/resources/oozie-default.xml Wed Apr 10 00:51:43 2013
@@ -1785,4 +1785,46 @@
</description>
</property>
+ <property>
+ <name>oozie.service.EventHandlerService.app.types</name>
+ <value>workflow_job, coordinator_action</value>
+ <description>
+ The app-types among workflow/coordinator/bundle job/action for which
+ for which events system is enabled.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.EventHandlerService.event.queue</name>
+ <value>org.apache.oozie.event.MemoryEventQueue</value>
+ <description>
+ The implementation for EventQueue in use by the EventHandlerService.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.EventHandlerService.queue.size</name>
+ <value>10000</value>
+ <description>
+ Maximum number of events to be contained in the event queue.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.EventHandlerService.worker.interval</name>
+ <value>60</value>
+ <description>
+ The default interval (seconds) at which the worker threads will be scheduled to run
+ and process events.
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.service.EventHandlerService.batch.size</name>
+ <value>10</value>
+ <description>
+ The batch size for batched draining per thread from the event queue.
+ </description>
+ </property>
+
</configuration>
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java (original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionStartXCommand.java Wed Apr 10 00:51:43 2013
@@ -87,7 +87,7 @@ public class TestCoordActionStartXComman
public void testActionStartCommand() throws IOException, JPAExecutorException, CommandException {
String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1";
addRecordToActionTable(actionId, 1, null);
- new CoordActionStartXCommand(actionId, "me", "mytoken", "myjob").call();
+ new CoordActionStartXCommand(actionId, "me", "myapp", "mytoken", "myjob").call();
checkCoordAction(actionId);
}
@@ -104,7 +104,7 @@ public class TestCoordActionStartXComman
String actionId = new Date().getTime() + "-COORD-ActionStartCommand-C@1";
String wfApp = "<start to='${someParam}' />";
addRecordToActionTable(actionId, 1, wfApp);
- new CoordActionStartXCommand(actionId, "me", "mytoken", "myjob").call();
+ new CoordActionStartXCommand(actionId, "me", "myapp", "mytoken", "myjob").call();
final JPAService jpaService = Services.get().get(JPAService.class);
CoordinatorActionBean action = jpaService.execute(new CoordActionGetForStartJPAExecutor(actionId));
if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) {
@@ -132,7 +132,7 @@ public class TestCoordActionStartXComman
CoordinatorAction.Status.SUBMITTED, "coord-action-start-escape-strings.xml", 0);
String actionId = action.getId();
- new CoordActionStartXCommand(actionId, getTestUser(), "undef","myjob").call();
+ new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "undef", "myjob").call();
final JPAService jpaService = Services.get().get(JPAService.class);
action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
Added: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.event.Event.AppType;
+import org.apache.oozie.client.event.JobEvent.EventStatus;
+import org.apache.oozie.command.coord.CoordActionCheckXCommand;
+import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
+import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordinatorXCommand;
+import org.apache.oozie.command.wf.KillXCommand;
+import org.apache.oozie.command.wf.ResumeXCommand;
+import org.apache.oozie.command.wf.SignalXCommand;
+import org.apache.oozie.command.wf.StartXCommand;
+import org.apache.oozie.command.wf.SuspendXCommand;
+import org.apache.oozie.command.wf.WorkflowXCommand;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.workflow.WorkflowInstance;
+import org.apache.oozie.workflow.lite.ActionNodeDef;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.LiteWorkflowApp;
+import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
+import org.apache.oozie.workflow.lite.StartNodeDef;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestControlNodeHandler;
+import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
+
+/**
+ * Testcase to test that events are correctly generated from corresponding
+ * Commands and inserted into events queue
+ */
+public class TestEventGeneration extends XDataTestCase {
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ Services services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+ services.init();
+ }
+
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ public void testWorkflowJobEvent() throws Exception {
+ EventHandlerService ehs = _testEventHandlerService();
+ EventQueue queue = ehs.getEventQueue();
+ assertEquals(queue.getCurrentSize(), 0);
+ WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
+ JPAService jpaService = Services.get().get(JPAService.class);
+
+ // Starting job
+ new StartXCommand(job.getId()).call();
+ WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
+ job = jpaService.execute(wfJobGetCmd);
+ assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.STARTED, event.getEventStatus());
+ assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+ assertEquals(job.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(job.getAppName(), event.getAppName());
+ assertEquals(job.getStartTime(), event.getStartTime());
+ assertEquals(0, queue.getCurrentSize());
+
+ // Suspending job
+ new SuspendXCommand(job.getId()).call();
+ job = jpaService.execute(wfJobGetCmd);
+ assertEquals(WorkflowJob.Status.SUSPENDED, job.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ event = (WorkflowJobEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.SUSPEND, event.getEventStatus());
+ assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+ assertEquals(job.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(job.getAppName(), event.getAppName());
+ assertEquals(0, queue.getCurrentSize());
+
+ // Resuming job
+ new ResumeXCommand(job.getId()).call();
+ job = jpaService.execute(wfJobGetCmd);
+ assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ event = (WorkflowJobEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+ assertEquals(job.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(job.getAppName(), event.getAppName());
+ assertEquals(job.getStartTime(), event.getStartTime());
+ assertEquals(0, queue.getCurrentSize());
+
+ // Killing job
+ new KillXCommand(job.getId()).call();
+ job = jpaService.execute(wfJobGetCmd);
+ assertEquals(WorkflowJob.Status.KILLED, job.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ event = (WorkflowJobEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.FAILURE, event.getEventStatus());
+ assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+ assertEquals(job.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(job.getAppName(), event.getAppName());
+ assertEquals(job.getStartTime(), event.getStartTime());
+ assertEquals(job.getEndTime(), event.getEndTime());
+ assertEquals(0, queue.getCurrentSize());
+
+ // Successful job (testing SignalX)
+ job = _createWorkflowJob();
+ LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
+ wfInstance.start();
+ job.setWfInstance(wfInstance);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(job));
+ WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
+ new SignalXCommand(job.getId(), wfAction.getId()).call();
+ job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
+ assertEquals(WorkflowJob.Status.SUCCEEDED, job.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ event = (WorkflowJobEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
+ assertEquals(job.getId(), event.getId());
+ assertEquals(job.getUser(), event.getUser());
+ assertEquals(job.getAppName(), event.getAppName());
+ assertEquals(job.getStartTime(), event.getStartTime());
+ assertEquals(job.getEndTime(), event.getEndTime());
+ assertEquals(0, queue.getCurrentSize());
+
+ }
+
+ public void testCoordinatorActionEvent() throws Exception {
+ EventHandlerService ehs = _testEventHandlerService();
+ // reduce noise from WF Job events (also default) by setting it to only
+ // coord action
+ ehs.setAppTypes(new HashSet<String>(Arrays.asList(new String[] { "coordinator_action" })));
+ EventQueue queue = ehs.getEventQueue();
+ assertEquals(queue.getCurrentSize(), 0);
+ Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:14Z");
+ CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+ false, 0);
+ _modifyCoordForRunning(coord);
+ final JPAService jpaService = Services.get().get(JPAService.class);
+
+ // Action WAITING on materialization
+ new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
+ final CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(coord.getId() + "@1");
+ CoordinatorActionBean action = jpaService.execute(coordGetCmd);
+ assertEquals(CoordinatorAction.Status.WAITING, action.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.WAITING, event.getEventStatus());
+ assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+ assertEquals(action.getId(), event.getId());
+ assertEquals(action.getJobId(), event.getParentId());
+ assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertNotNull(event.getMissingDeps());
+ assertEquals(coord.getUser(), event.getUser());
+ assertEquals(coord.getAppName(), event.getAppName());
+ assertEquals(0, queue.getCurrentSize());
+
+ // Make Action ready
+ new CoordActionInputCheckXCommand(action.getId(), coord.getId()).call();
+ action = jpaService.execute(coordGetCmd);
+ assertEquals(CoordinatorAction.Status.READY, action.getStatus());
+
+ waitFor(1 * 100, new Predicate() {
+ public boolean evaluate() throws Exception {
+ return jpaService.execute(coordGetCmd).getStatus() == CoordinatorAction.Status.RUNNING;
+ }
+ });
+ assertEquals(1, queue.getCurrentSize());
+ event = (CoordinatorActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.STARTED, event.getEventStatus());
+ assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+ assertEquals(action.getId(), event.getId());
+ assertEquals(action.getJobId(), event.getParentId());
+ assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertEquals(coord.getUser(), event.getUser());
+ assertEquals(coord.getAppName(), event.getAppName());
+ assertEquals(0, queue.getCurrentSize());
+
+ // Action Success
+ action = jpaService.execute(coordGetCmd);
+ WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
+ wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ new CoordActionCheckXCommand(action.getId(), 0).call();
+ action = jpaService.execute(coordGetCmd);
+ assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ event = (CoordinatorActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.SUCCESS, event.getEventStatus());
+ assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+ assertEquals(action.getId(), event.getId());
+ assertEquals(action.getJobId(), event.getParentId());
+ assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertEquals(coord.getUser(), event.getUser());
+ assertEquals(coord.getAppName(), event.getAppName());
+ assertEquals(0, queue.getCurrentSize());
+
+ // Action Failure
+ action.setStatus(CoordinatorAction.Status.RUNNING);
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ wfJob.setStatus(WorkflowJob.Status.KILLED);
+ jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
+ new CoordActionCheckXCommand(action.getId(), 0).call();
+ action = jpaService.execute(coordGetCmd);
+ assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
+ assertEquals(1, queue.getCurrentSize());
+ event = (CoordinatorActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals(EventStatus.FAILURE, event.getEventStatus());
+ assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
+ assertEquals(action.getId(), event.getId());
+ assertEquals(action.getJobId(), event.getParentId());
+ assertEquals(action.getNominalTime(), event.getNominalTime());
+ assertEquals(action.getCreatedTime(), event.getStartTime());
+ assertEquals(coord.getUser(), event.getUser());
+ assertEquals(coord.getAppName(), event.getAppName());
+ assertEquals(0, queue.getCurrentSize());
+
+ }
+
+ public void testWorkflowJobEventError() throws Exception {
+ EventHandlerService ehs = _testEventHandlerService();
+ EventQueue queue = ehs.getEventQueue();
+ final WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
+ // create event with error code and message
+ WorkflowXCommand<Void> myCmd = new KillXCommand(job.getId()) {
+ @Override
+ protected Void execute() {
+ WorkflowXCommand.generateEvent(job, "errorCode", "errorMsg");
+ return null;
+ }
+ };
+ myCmd.call();
+ WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals("errorCode", event.getErrorCode());
+ assertEquals("errorMsg", event.getErrorMessage());
+ assertEquals(EventStatus.FAILURE, event.getEventStatus());
+
+ }
+
+ public void testCoordinatorActionEventDependencies() throws Exception {
+ EventHandlerService ehs = _testEventHandlerService();
+ EventQueue queue = ehs.getEventQueue();
+ final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
+ @Override
+ protected Void execute() {
+ CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName());
+ return null;
+ }
+ };
+
+ // CASE 1: Only pull missing deps
+ action.setMissingDependencies("pull");
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ myCmd.call();
+ CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals("pull", event.getMissingDeps());
+
+ // CASE 2: Only hcat (push) missing deps
+ action.setMissingDependencies(null);
+ action.setPushMissingDependencies("push");
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ myCmd.call();
+ event = (CoordinatorActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals("push", event.getMissingDeps());
+
+ // CASE 3: Both types
+ action.setMissingDependencies("pull");
+ jpaService.execute(new CoordActionUpdateJPAExecutor(action));
+ myCmd.call();
+ event = (CoordinatorActionEvent) queue.poll();
+ assertNotNull(event);
+ assertEquals("pull" + CoordELFunctions.INSTANCE_SEPARATOR + "push", event.getMissingDeps());
+
+ }
+
+ private EventHandlerService _testEventHandlerService() throws Exception {
+ Services services = Services.get();
+ EventHandlerService ehs = services.get(EventHandlerService.class);
+ assertNotNull(ehs);
+ return ehs;
+ }
+
+ private void _modifyCoordForRunning(CoordinatorJobBean coord) throws Exception {
+ String wfXml = IOUtils.getResourceAsString("wf-credentials.xml", -1);
+ writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
+ String coordXml = coord.getJobXml();
+ coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
+ Services.get().get(JPAService.class).execute(new CoordJobUpdateJPAExecutor(coord));
+ }
+
+ private WorkflowJobBean _createWorkflowJob() throws Exception {
+ LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
+ new StartNodeDef(TestControlNodeHandler.class, "one"))
+ .addNode(new ActionNodeDef("one", "<java></java>", TestActionNodeHandler.class, "end", "end"))
+ .addNode(new EndNodeDef("end", TestControlNodeHandler.class));
+ Configuration conf = new Configuration();
+ Path appUri = new Path(getAppPath(), "workflow.xml");
+ conf.set(OozieClient.APP_PATH, appUri.toString());
+ conf.set(OozieClient.LOG_TOKEN, "testToken");
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ WorkflowJobBean workflow = createWorkflow(app, conf, "auth", WorkflowJob.Status.PREP,
+ WorkflowInstance.Status.PREP);
+ String executionPath = "/";
+
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
+ jpaService.execute(wfInsertCmd);
+ WorkflowActionBean wfAction = addRecordToWfActionTable(workflow.getId(), "one", WorkflowAction.Status.OK,
+ executionPath);
+ wfAction.setPending();
+ wfAction.setSignalValue(WorkflowAction.Status.OK.name());
+ jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
+
+ return workflow;
+ }
+
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/event/TestEventQueue.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.event;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+/**
+ * Test case to check correct functioning of MemoryEventQueue
+ */
+public class TestEventQueue extends XDataTestCase {
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ Services services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+ services.init();
+ }
+
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ public void testMemoryEventQueueBasic() throws Exception {
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ assertNotNull(ehs);
+ EventQueue eventQ = ehs.getEventQueue();
+ assertNotNull(eventQ);
+ assertTrue(eventQ instanceof MemoryEventQueue); //default
+ }
+
+ public void testQueueOperations() throws Exception {
+ Services services = Services.get();
+ Configuration conf = services.getConf();
+
+ // set smaller batch size for the events queue
+ conf.setInt(EventHandlerService.CONF_BATCH_SIZE, 3);
+ EventHandlerService ehs = Services.get().get(EventHandlerService.class);
+ ehs.init(services);
+ EventQueue eventQ = ehs.getEventQueue();
+ assertEquals(eventQ.getCurrentSize(), 0);
+ assertEquals(eventQ.getBatchSize(), 3);
+
+ // create some events to enqueue
+ WorkflowJobEvent wfEvent = new WorkflowJobEvent("1234-W", "1234-C", WorkflowJob.Status.RUNNING, getTestUser(),
+ "myapp", null, null);
+ for (int i = 0; i < 10; i++)
+ ehs.queueEvent(wfEvent);
+ assertEquals(eventQ.getCurrentSize(), 10);
+
+ // test single threads polling from queue
+ int numThreads = 1;
+ Thread[] thread = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ thread[i] = new Thread(ehs.new EventWorker());
+ thread[i].run();
+ }
+ assertEquals(eventQ.getCurrentSize(), 7); // n(events) - n(batch) i.e.
+ // 10-3 = 7
+
+ // restore events count to 10
+ for (int i = 0; i < 3; i++)
+ ehs.queueEvent(wfEvent);
+ assertEquals(eventQ.getCurrentSize(), 10);
+ // test two threads polling concurrently from queue
+ numThreads = 2;
+ thread = new Thread[numThreads];
+ for (int i = 0; i < numThreads; i++) {
+ thread[i] = new Thread(ehs.new EventWorker());
+ thread[i].run();
+ }
+ assertEquals(eventQ.getCurrentSize(), 4); // n(events) - n(batch)*n(threads)
+ // i.e. 10 - 3*2 = 4
+
+ // enqueue events again
+ for (int i = 0; i < 6; i++)
+ ehs.queueEvent(wfEvent);
+ assertEquals(eventQ.getCurrentSize(), 10);
+ // test the 2 threads draining repeatedly (mimicking SchedulerService)
+ // from queue
+ int repetition = 3;
+ int r = 0;
+ while (r < repetition) {
+ if (eventQ.isEmpty())
+ break;
+ for (int i = 0; i < numThreads; i++) {
+ thread[i].run();
+ }
+ r++;
+ }
+ assertEquals(eventQ.getCurrentSize(), 0);
+ }
+
+}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java?rev=1466307&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/service/TestEventHandlerService.java Wed Apr 10 00:51:43 2013
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.event.BundleJobEvent;
+import org.apache.oozie.event.CoordinatorActionEvent;
+import org.apache.oozie.event.CoordinatorJobEvent;
+import org.apache.oozie.event.MemoryEventQueue;
+import org.apache.oozie.event.WorkflowActionEvent;
+import org.apache.oozie.event.WorkflowJobEvent;
+import org.apache.oozie.event.listener.JobEventListener;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestEventHandlerService extends XDataTestCase {
+
+ StringBuilder output = new StringBuilder();
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ Services services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
+ services.init();
+ }
+
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ public void testService() throws Exception {
+ EventHandlerService ehs = _testEventHandlerService();
+ // check default initializations
+ assertTrue(ehs.getEventQueue() instanceof MemoryEventQueue);
+ Set<String> jobtypes = ehs.getAppTypes();
+ assertTrue(jobtypes.contains("workflow_job"));
+ assertTrue(jobtypes.contains("coordinator_action"));
+ }
+
+ public void testEventListener() throws Exception {
+ EventHandlerService ehs = _testEventHandlerService();
+ ehs.addEventListener(new DummyJobEventListener());
+
+ /*
+ * Workflow Job events
+ */
+ WorkflowJobEvent event = new WorkflowJobEvent("jobid", "parentid", WorkflowJob.Status.RUNNING, getTestUser(),
+ "myapp", null, null);
+ ehs.queueEvent(event);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Job STARTED"));
+ output.setLength(0);
+
+ event.setStatus(WorkflowJob.Status.SUSPENDED);
+ ehs.queueEvent(event);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Job SUSPEND"));
+ output.setLength(0);
+
+ event.setStatus(WorkflowJob.Status.SUCCEEDED);
+ ehs.queueEvent(event);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Job SUCCESS"));
+ output.setLength(0);
+
+ event.setStatus(WorkflowJob.Status.KILLED);
+ ehs.queueEvent(event);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Workflow Job FAILURE"));
+ output.setLength(0);
+
+ /*
+ * Coordinator Action events
+ */
+ CoordinatorActionEvent event2 = new CoordinatorActionEvent("jobid", "parentid",
+ CoordinatorAction.Status.WAITING, getTestUser(), "myapp", null, null, null);
+ ehs.queueEvent(event2);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Coord Action WAITING"));
+ output.setLength(0);
+
+ event2.setStatus(CoordinatorAction.Status.RUNNING);
+ ehs.queueEvent(event2);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Coord Action START"));
+ output.setLength(0);
+
+ event2.setStatus(CoordinatorAction.Status.SUSPENDED);
+ ehs.queueEvent(event2);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Coord Action SUSPEND"));
+ output.setLength(0);
+
+ event2.setStatus(CoordinatorAction.Status.SUCCEEDED);
+ ehs.queueEvent(event2);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Coord Action SUCCESS"));
+ output.setLength(0);
+
+ event2.setStatus(CoordinatorAction.Status.TIMEDOUT);
+ ehs.queueEvent(event2);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Coord Action FAILURE"));
+ output.setLength(0);
+
+ event2.setStatus(CoordinatorAction.Status.KILLED);
+ ehs.queueEvent(event2);
+ ehs.new EventWorker().run();
+ assertTrue(output.toString().contains("Coord Action FAILURE"));
+ output.setLength(0);
+ }
+
+ private EventHandlerService _testEventHandlerService() throws Exception {
+ Services services = Services.get();
+ EventHandlerService ehs = services.get(EventHandlerService.class);
+ assertNotNull(ehs);
+ return ehs;
+ }
+
+ class DummyJobEventListener extends JobEventListener {
+
+ @Override
+ public void onWorkflowJobStart(WorkflowJobEvent wje) {
+ if (wje != null) {
+ output.append("Dummy Workflow Job STARTED");
+ }
+ }
+
+ @Override
+ public void onWorkflowJobSuccess(WorkflowJobEvent wje) {
+ if (wje != null) {
+ output.append("Dummy Workflow Job SUCCESS");
+ }
+ }
+
+ @Override
+ public void onWorkflowJobFailure(WorkflowJobEvent wje) {
+ if (wje != null) {
+ output.append("Dummy Workflow Job FAILURE");
+ }
+ }
+
+ @Override
+ public void onWorkflowJobSuspend(WorkflowJobEvent wje) {
+ if (wje != null) {
+ output.append("Dummy Workflow Job SUSPEND");
+ }
+ }
+
+ @Override
+ public void onWorkflowActionStart(WorkflowActionEvent wae) {
+ if (wae != null) {
+ output.append("Dummy Workflow Action START");
+ }
+ }
+
+ @Override
+ public void onWorkflowActionSuccess(WorkflowActionEvent wae) {
+ if (wae != null) {
+ output.append("Dummy Workflow Action SUCCESS");
+ }
+ }
+
+ @Override
+ public void onWorkflowActionFailure(WorkflowActionEvent wae) {
+ if (wae != null) {
+ output.append("Dummy Workflow Action FAILURE");
+ }
+ }
+
+ @Override
+ public void onWorkflowActionSuspend(WorkflowActionEvent wae) {
+ if (wae != null) {
+ output.append("Dummy Workflow Action SUSPEND");
+ }
+ }
+
+ @Override
+ public void onCoordinatorJobStart(CoordinatorJobEvent cje) {
+ if (cje != null) {
+ output.append("Dummy Coord Job START");
+ }
+ }
+
+ @Override
+ public void onCoordinatorJobSuccess(CoordinatorJobEvent cje) {
+ if (cje != null) {
+ output.append("Dummy Coord Job SUCCESS");
+ }
+ }
+
+ @Override
+ public void onCoordinatorJobFailure(CoordinatorJobEvent cje) {
+ if (cje != null) {
+ output.append("Dummy Coord Job FAILURE");
+ }
+ }
+
+ @Override
+ public void onCoordinatorJobSuspend(CoordinatorJobEvent cje) {
+ if (cje != null) {
+ output.append("Dummy Coord Job SUSPEND");
+ }
+ }
+
+ @Override
+ public void onCoordinatorActionWaiting(CoordinatorActionEvent cae) {
+ if (cae != null) {
+ output.append("Dummy Coord Action WAITING");
+ }
+ }
+
+ @Override
+ public void onCoordinatorActionStart(CoordinatorActionEvent cae) {
+ if (cae != null) {
+ output.append("Dummy Coord Action START");
+ }
+ }
+
+ @Override
+ public void onCoordinatorActionSuccess(CoordinatorActionEvent cae) {
+ if (cae != null) {
+ output.append("Dummy Coord Action SUCCESS");
+ }
+ }
+
+ @Override
+ public void onCoordinatorActionFailure(CoordinatorActionEvent cae) {
+ if (cae != null) {
+ output.append("Dummy Coord Action FAILURE");
+ }
+ }
+
+ @Override
+ public void onCoordinatorActionSuspend(CoordinatorActionEvent cae) {
+ if (cae != null) {
+ output.append("Dummy Coord Action SUSPEND");
+ }
+ }
+
+ @Override
+ public void onBundleJobStart(BundleJobEvent bje) {
+ if (bje != null) {
+ output.append("Dummy Bundle Job START");
+ }
+ }
+
+ @Override
+ public void onBundleJobSuccess(BundleJobEvent bje) {
+ if (bje != null) {
+ output.append("Dummy Bundle Job SUCCESS");
+ }
+ }
+
+ @Override
+ public void onBundleJobFailure(BundleJobEvent bje) {
+ if (bje != null) {
+ output.append("Dummy Bundle Job FAILURE");
+ }
+ }
+
+ @Override
+ public void onBundleJobSuspend(BundleJobEvent bje) {
+ if (bje != null) {
+ output.append("Dummy Bundle Job SUSPEND");
+ }
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ }
+
+}
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1466307&r1=1466306&r2=1466307&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Apr 10 00:51:43 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1209 Event generation and handling for workflow and coordinator (mona)
OOZIE-1118 improve logic of purge service (rkanter)
OOZIE-1205 If the JobTracker is restarted during a Fork, Oozie doesn't fail all of the currently running actions (rkanter)
OOZIE-1286 SSH Action does not properly handle arguments that have spaces (rkanter)