You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by sh...@apache.org on 2014/07/24 09:03:20 UTC
git commit: OOZIE-1846 Convert CoordActionMaterializeCommand to an
XCommand and remove Command (seoeun25 via shwethags)
Repository: oozie
Updated Branches:
refs/heads/master 12ef61470 -> 313652559
OOZIE-1846 Convert CoordActionMaterializeCommand to an XCommand and remove Command (seoeun25 via shwethags)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/31365255
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/31365255
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/31365255
Branch: refs/heads/master
Commit: 313652559bc43f3c89c1f76c4d00c0529314213c
Parents: 12ef614
Author: Shwetha GS <sh...@inmobi.com>
Authored: Thu Jul 24 12:04:01 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Thu Jul 24 12:06:58 2014 +0530
----------------------------------------------------------------------
.../java/org/apache/oozie/command/Command.java | 594 -------------------
.../apache/oozie/command/CommandException.java | 2 +-
.../coord/CoordActionMaterializeCommand.java | 375 ------------
.../CoordMaterializeTransitionXCommand.java | 12 +-
.../command/coord/CoordSubmitXCommand.java | 18 +-
.../oozie/command/coord/CoordinatorCommand.java | 47 --
.../service/CoordMaterializeTriggerService.java | 2 +-
.../org/apache/oozie/command/TestCommand.java | 214 -------
.../TestCoordActionMaterializeCommand.java | 323 ----------
.../command/coord/TestCoordELExtensions.java | 4 +-
.../TestCoordMaterializeTransitionXCommand.java | 16 +-
pom.xml | 2 +-
release-log.txt | 1 +
13 files changed, 42 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/Command.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/Command.java b/core/src/main/java/org/apache/oozie/command/Command.java
deleted file mode 100644
index 0e51b8e..0000000
--- a/core/src/main/java/org/apache/oozie/command/Command.java
+++ /dev/null
@@ -1,594 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.FaultInjection;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.XException;
-import org.apache.oozie.service.CallableQueueService;
-import org.apache.oozie.service.DagXLogInfoService;
-import org.apache.oozie.service.InstrumentationService;
-import org.apache.oozie.service.MemoryLocksService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.StoreService;
-import org.apache.oozie.service.XLogService;
-import org.apache.oozie.store.Store;
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.store.WorkflowStore;
-import org.apache.oozie.util.InstrumentUtils;
-import org.apache.oozie.util.Instrumentation;
-import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.util.XCallable;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.lock.LockToken;
-
-/**
- * Base class for all synchronous and asynchronous DagEngine commands.
- */
-public abstract class Command<T, S extends Store> implements XCallable<T> {
- /**
- * The instrumentation group used for Commands.
- */
- private static final String INSTRUMENTATION_GROUP = "commands";
-
- private final long createdTime;
-
- /**
- * The instrumentation group used for Jobs.
- */
- private static final String INSTRUMENTATION_JOB_GROUP = "jobs";
-
- private static final long LOCK_TIMEOUT = 1000;
- protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;
-
- protected Instrumentation instrumentation;
- private List<XCallable<Void>> callables;
- private List<XCallable<Void>> delayedCallables;
- private long delay = 0;
- private List<XCallable<Void>> exceptionCallables;
- private String name;
- private String type;
- private String key;
- private int priority;
- private int logMask;
- private boolean withStore;
- protected boolean dryrun = false;
- private ArrayList<LockToken> locks = null;
-
- /**
- * This variable is package private for testing purposes only.
- */
- XLog.Info logInfo;
-
- /**
- * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
- * captured for execution.
- *
- * @param name command name.
- * @param type command type.
- * @param priority priority of the command, used when queuing for asynchronous execution.
- * @param logMask log mask for the command logging calls.
- */
- public Command(String name, String type, int priority, int logMask) {
- this(name, type, priority, logMask, true);
- }
-
- /**
- * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
- *
- * @param name command name.
- * @param type command type.
- * @param priority priority of the command, used when queuing for asynchronous execution.
- * @param logMask log mask for the command logging calls.
- * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
- */
- public Command(String name, String type, int priority, int logMask, boolean withStore) {
- this.name = ParamChecker.notEmpty(name, "name");
- this.type = ParamChecker.notEmpty(type, "type");
- this.key = name + "_" + UUID.randomUUID();
- this.priority = priority;
- this.withStore = withStore;
- this.logMask = logMask;
- instrumentation = Services.get().get(InstrumentationService.class).get();
- logInfo = new XLog.Info(XLog.Info.get());
- createdTime = System.currentTimeMillis();
- locks = new ArrayList<LockToken>();
- }
-
- /**
- * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
- *
- * @param name command name.
- * @param type command type.
- * @param priority priority of the command, used when queuing for asynchronous execution.
- * @param logMask log mask for the command logging calls.
- * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
- * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
- * really submitting the job
- */
- public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
- this(name, type, priority, logMask, withStore);
- this.dryrun = dryrun;
- }
-
- /**
- * Return the name of the command.
- *
- * @return the name of the command.
- */
- @Override
- public String getName() {
- return name;
- }
-
- /**
- * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
- * org.apache.oozie.service.CallableQueueService}.
- *
- * @return the callable type.
- */
- @Override
- public String getType() {
- return type;
- }
-
- /**
- * Return the priority of the command.
- *
- * @return the priority of the command.
- */
- @Override
- public int getPriority() {
- return priority;
- }
-
- /**
- * Returns the createdTime of the callable in milliseconds
- *
- * @return the callable createdTime
- */
- @Override
- public long getCreatedTime() {
- return createdTime;
- }
-
- /**
- * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
- * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
- * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
- * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
- * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
- * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
- * commands queued for exception will be effectively queued fro execution..
- *
- * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
- * without committing, thus doing a rollback.
- */
- @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
- public final T call() throws CommandException {
- XLog.Info.get().setParameters(logInfo);
- XLog log = XLog.getLog(getClass());
- log.trace(logMask, "Start");
- Instrumentation.Cron cron = new Instrumentation.Cron();
- cron.start();
- callables = new ArrayList<XCallable<Void>>();
- delayedCallables = new ArrayList<XCallable<Void>>();
- exceptionCallables = new ArrayList<XCallable<Void>>();
- delay = 0;
- S store = null;
- boolean exception = false;
-
- try {
- if (withStore) {
- store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
- store.beginTrx();
- }
- T result = execute(store);
- /*
- *
- * if (store != null && log != null) { log.info(XLog.STD,
- * "connection log from store Flush Mode {0} ",
- * store.getFlushMode()); }
- */
- if (withStore) {
- if (store == null) {
- throw new IllegalStateException("WorkflowStore should not be null");
- }
- if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
- throw new RuntimeException("Skipping Commit for Failover Testing");
- }
- store.commitTrx();
- }
-
- // TODO figure out the reject due to concurrency problems and remove
- // the delayed queuing for callables.
- boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
- if (ret == false) {
- logQueueCallableFalse(callables);
- }
-
- ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
- if (ret == false) {
- logQueueCallableFalse(delayedCallables);
- }
-
- return result;
- }
- catch (XException ex) {
- log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex);
- if (store != null) {
- log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
- .isClosed());
- }
- exception = true;
- if (store != null && store.isActive()) {
- try {
- store.rollbackTrx();
- }
- catch (RuntimeException rex) {
- log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
- }
- }
-
- // TODO figure out the reject due to concurrency problems and remove
- // the delayed queuing for callables.
- boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
- if (ret == false) {
- logQueueCallableFalse(exceptionCallables);
- }
- if (ex instanceof CommandException) {
- throw (CommandException) ex;
- }
- else {
- throw new CommandException(ex);
- }
- }
- catch (Exception ex) {
- log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex);
- exception = true;
- if (store != null && store.isActive()) {
- try {
- store.rollbackTrx();
- }
- catch (RuntimeException rex) {
- log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
- }
- }
- throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex);
- }
- catch (Error er) {
- log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er);
- exception = true;
- if (store != null && store.isActive()) {
- try {
- store.rollbackTrx();
- }
- catch (RuntimeException rex) {
- log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
- }
- }
- throw er;
- }
- finally {
- FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
- cron.stop();
- instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
- InstrumentUtils.incrCommandCounter(name, 1, instrumentation);
- log.trace(logMask, "End");
- if (locks != null) {
- for (LockToken lock : locks) {
- lock.release();
- }
- locks.clear();
- }
- if (store != null) {
- if (!store.isActive()) {
- try {
- store.closeTrx();
- }
- catch (RuntimeException rex) {
- if (exception) {
- log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
- }
- else {
- throw rex;
- }
- }
- }
- else {
- log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
- }
- }
- }
- }
-
- /**
- * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
- * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
- * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
- * are not queued for execution.
- *
- * @param callable callable to queue for execution.
- */
- protected void queueCallable(XCallable<Void> callable) {
- callables.add(callable);
- }
-
- /**
- * Queue a list of callables for execution after the current callable call invocation completes and the {@link
- * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
- * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
- * discarded, they are not queued for execution.
- *
- * @param callables list of callables to queue for execution.
- */
- protected void queueCallable(List<? extends XCallable<Void>> callables) {
- this.callables.addAll(callables);
- }
-
- /**
- * Queue a callable for delayed execution after the current callable call invocation completes and the {@link
- * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
- * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
- * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
- * execution.
- *
- * @param callable callable to queue for delayed execution.
- * @param delay the queue delay in milliseconds
- */
- protected void queueCallable(XCallable<Void> callable, long delay) {
- this.delayedCallables.add(callable);
- this.delay = Math.max(this.delay, delay);
- }
-
- /**
- * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
- * an exception does not happen, all the callables queued by this method are discarded, they are not queued for
- * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
- * serial execution.
- *
- * @param callable callable to queue for execution in the case of an exception.
- */
- protected void queueCallableForException(XCallable<Void> callable) {
- exceptionCallables.add(callable);
- }
-
- /**
- * Logging the info if failed to queue the callables.
- *
- * @param callables
- */
- protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
- StringBuilder sb = new StringBuilder(
- "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
- int size = callables.size();
- for (int i = 0; i < size; i++) {
- XCallable<Void> callable = callables.get(i);
- sb.append(callable.getName());
- if (i < size - 1) {
- sb.append(", ");
- }
- else {
- sb.append("]");
- }
- }
- XLog.getLog(getClass()).warn(sb.toString());
- }
-
- /**
- * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
- * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
- * is rolledback.
- *
- * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
- * store.
- * @return the return value of the callable.
- * @throws StoreException thrown if the workflow store could not perform an operation.
- * @throws CommandException thrown if the command could not perform its operation.
- */
- protected abstract T call(S store) throws StoreException, CommandException;
-
- // to do
- // need to implement on all sub commands and break down the transactions
-
- // protected abstract T execute(String id) throws CommandException;
-
- /**
- * Command subclasses must implement this method correct Store can be passed to call(store);
- *
- * @return the Store class for use by Callable
- * @throws CommandException thrown if the command could not perform its operation.
- */
- protected abstract Class<? extends Store> getStoreClass();
-
- /**
- * Set the log info with the context of the given coordinator bean.
- *
- * @param cBean coordinator bean.
- */
- protected void setLogInfo(CoordinatorJobBean cBean) {
- if (logInfo.getParameter(XLogService.GROUP) == null) {
- logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
- }
- if (logInfo.getParameter(XLogService.USER) == null) {
- logInfo.setParameter(XLogService.USER, cBean.getUser());
- }
- logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
- logInfo.setParameter(DagXLogInfoService.TOKEN, "");
- logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
- XLog.Info.get().setParameters(logInfo);
- }
-
- /**
- * Set the log info with the context of the given coordinator action bean.
- *
- * @param action action bean.
- */
- protected void setLogInfo(CoordinatorActionBean action) {
- logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
- // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
- logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
- XLog.Info.get().setParameters(logInfo);
- }
-
- /**
- * Set the log info with the context of the given workflow bean.
- *
- * @param workflow workflow bean.
- */
- protected void setLogInfo(WorkflowJobBean workflow) {
- if (logInfo.getParameter(XLogService.GROUP) == null) {
- logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
- }
- if (logInfo.getParameter(XLogService.USER) == null) {
- logInfo.setParameter(XLogService.USER, workflow.getUser());
- }
- logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
- logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
- logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
- XLog.Info.get().setParameters(logInfo);
- }
-
- /**
- * Set the log info with the context of the given action bean.
- *
- * @param action action bean.
- */
- protected void setLogInfo(WorkflowActionBean action) {
- logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
- logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
- logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
- XLog.Info.get().setParameters(logInfo);
- }
-
- /**
- * Reset the action bean information from the log info.
- */
- // TODO check if they are used, else delete
- protected void resetLogInfoAction() {
- logInfo.clearParameter(DagXLogInfoService.ACTION);
- XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
- }
-
- /**
- * Reset the workflow bean information from the log info.
- */
- // TODO check if they are used, else delete
- protected void resetLogInfoWorkflow() {
- logInfo.clearParameter(DagXLogInfoService.JOB);
- logInfo.clearParameter(DagXLogInfoService.APP);
- logInfo.clearParameter(DagXLogInfoService.TOKEN);
- XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
- XLog.Info.get().clearParameter(DagXLogInfoService.APP);
- XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
- }
-
- /**
- * Return the {@link Instrumentation} instance in use.
- *
- * @return the {@link Instrumentation} instance in use.
- */
- protected Instrumentation getInstrumentation() {
- return instrumentation;
- }
-
- /**
- * Return the identity.
- *
- * @return the identity.
- */
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(getType());
- sb.append(",").append(getPriority());
- return sb.toString();
- }
-
- protected boolean lock(String id) throws InterruptedException {
- if (id == null || id.length() == 0) {
- XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
- return false;
- }
- LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
- if (token != null) {
- locks.add(token);
- return true;
- }
- else {
- return false;
- }
- }
-
- /*
- * TODO - remove store coupling to EM. Store will only contain queries
- * protected EntityManager getEntityManager() { return
- * store.getEntityManager(); }
- */
- protected T execute(S store) throws CommandException, StoreException {
- T result = call(store);
- return result;
- }
-
- /**
- * Get command key
- *
- * @return command key
- */
- @Override
- public String getKey(){
- return this.key;
- }
-
- /**
- * Get command lock key returning the key as an entity key, [not used] Just
- * to be able to implement XCallable [to be deprecated]
- *
- * @return key
- */
- @Override
- public String getEntityKey() {
- return this.key;
- }
-
- /**
- * set the mode of execution for the callable. True if in interrupt, false
- * if not [to be deprecated]
- */
- public void setInterruptMode(boolean mode) {
- }
-
- /**
- * [to be deprecated]
- *
- * @return the mode of execution. true if it is executed as an Interrupt,
- * false otherwise
- */
- public boolean inInterruptMode() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/CommandException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/CommandException.java b/core/src/main/java/org/apache/oozie/command/CommandException.java
index ce5ef54..3740bc8 100644
--- a/core/src/main/java/org/apache/oozie/command/CommandException.java
+++ b/core/src/main/java/org/apache/oozie/command/CommandException.java
@@ -21,7 +21,7 @@ import org.apache.oozie.XException;
import org.apache.oozie.ErrorCode;
/**
- * Exception thrown by {@link Command}s.
+ * Exception thrown by {@link XCommand}s.
*/
public class CommandException extends XException {
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
deleted file mode 100644
index 14dee97..0000000
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command.coord;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.AppType;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.SLAEventBean;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.SLAEvent.SlaAppType;
-import org.apache.oozie.client.rest.JsonBean;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Service;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.Instrumentation;
-import org.apache.oozie.sla.SLAOperations;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.db.SLADbOperations;
-import org.jdom.Element;
-
-@SuppressWarnings("deprecation")
-public class CoordActionMaterializeCommand extends CoordinatorCommand<Void> {
- private String jobId;
- private Date startTime;
- private Date endTime;
- private int lastActionNumber = 1; // over-ride by DB value
- private final XLog log = XLog.getLog(getClass());
- private String user;
- private String group;
- private List<JsonBean> insertList = new ArrayList<JsonBean>();
- private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
-
- /**
- * Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout
- */
- public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout";
-
- public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) {
- super("coord_action_mater", "coord_action_mater", 1, XLog.STD, false);
- this.jobId = jobId;
- this.startTime = startTime;
- this.endTime = endTime;
- }
-
- @Override
- protected Void call(CoordinatorStore store) throws CommandException {
- CoordJobGetJPAExecutor getCoordJob = new CoordJobGetJPAExecutor(jobId);
- CoordinatorJobBean job;
- try {
- job = Services.get().get(JPAService.class).execute(getCoordJob);
- }
- catch (JPAExecutorException jex) {
- throw new CommandException(jex);
- }
- setLogInfo(job);
- if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) {
- log.info("ENDED Coordinator materialization for jobId = " + jobId
- + " Action is *already* materialized for Materialization start time = " + startTime + " : Materialization end time = " + endTime + " Job status = " + job.getStatusStr());
- return null;
- }
-
- if (endTime.after(job.getEndTime())) {
- log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization end time = " + endTime
- + " surpasses coordinator job's end time = " + job.getEndTime() + " Job status = " + job.getStatusStr());
- return null;
- }
-
- if (job.getPauseTime() != null && !startTime.before(job.getPauseTime())) {
- log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization start time = " + startTime
- + " is after or equal to coordinator job's pause time = " + job.getPauseTime() + " Job status = " + job.getStatusStr());
- // pausetime blocks real materialization - we change job's status back to RUNNING;
- if (job.getStatus() == CoordinatorJob.Status.PREMATER) {
- job.setStatus(CoordinatorJob.Status.RUNNING);
- }
- updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job));
- return null;
- }
-
- this.user = job.getUser();
- this.group = job.getGroup();
-
- if (job.getStatus().equals(CoordinatorJobBean.Status.PREMATER)) {
- Configuration jobConf = null;
- log.debug("start job :" + jobId + " Materialization ");
- try {
- jobConf = new XConfiguration(new StringReader(job.getConf()));
- }
- catch (IOException ioe) {
- log.warn("Configuration parse error. read from DB :" + job.getConf(), ioe);
- throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
- }
-
- try {
- materializeJobs(false, job, jobConf, store);
- updateJobTable(job, store);
- }
- catch (CommandException ex) {
- log.warn("Exception occurs:" + ex + " Making the job failed ");
- job.setStatus(CoordinatorJobBean.Status.FAILED);
- updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job));
- }
- catch (Exception e) {
- log.error("Excepion thrown :", e);
- throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
- }
- }
- else {
- log.info("WARN: action is not in PREMATER state! It's in state=" + job.getStatus());
- }
- return null;
- }
-
- /**
- * Create action instances starting from "start-time" to end-time" and store them into Action table.
- *
- * @param dryrun
- * @param jobBean
- * @param conf
- * @param store
- * @throws Exception
- */
- protected String materializeJobs(boolean dryrun, CoordinatorJobBean jobBean, Configuration conf,
- CoordinatorStore store) throws Exception {
- String jobXml = jobBean.getJobXml();
- Element eJob = XmlUtils.parseXml(jobXml);
- // TODO: always UTC?
- TimeZone appTz = DateUtils.getTimeZone(jobBean.getTimeZone());
- // TimeZone appTz = DateUtils.getTimeZone("UTC");
- int frequency = Integer.valueOf(jobBean.getFrequency());
- TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
- TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
- Calendar start = Calendar.getInstance(appTz);
- start.setTime(startTime);
- DateUtils.moveToEnd(start, endOfFlag);
- Calendar end = Calendar.getInstance(appTz);
- end.setTime(endTime);
- lastActionNumber = jobBean.getLastActionNumber();
- // DateUtils.moveToEnd(end, endOfFlag);
- log.info(" *** materialize Actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime()
- + ", end=" + end.getTime() + "\n TimeUNIT " + freqTU.getCalendarUnit() + " Frequency :" + frequency
- + ":" + freqTU + " lastActionNumber " + lastActionNumber);
- // Keep the actual start time
- Calendar origStart = Calendar.getInstance(appTz);
- origStart.setTime(jobBean.getStartTimestamp());
- // Move to the End of duration, if needed.
- DateUtils.moveToEnd(origStart, endOfFlag);
- // Cloning the start time to be used in loop iteration
- Calendar effStart = (Calendar) origStart.clone();
- // Move the time when the previous action finished
- effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
-
- String action = null;
- StringBuilder actionStrings = new StringBuilder();
- Date jobPauseTime = jobBean.getPauseTime();
- Calendar pause = null;
- if (jobPauseTime != null) {
- pause = Calendar.getInstance(appTz);
- pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
- }
-
- while (effStart.compareTo(end) < 0) {
- if (pause != null && effStart.compareTo(pause) >= 0) {
- break;
- }
- CoordinatorActionBean actionBean = new CoordinatorActionBean();
- lastActionNumber++;
-
- int timeout = jobBean.getTimeout();
- log.debug(origStart.getTime() + " Materializing action for time=" + effStart.getTime()
- + ", lastactionnumber=" + lastActionNumber);
- Date actualTime = new Date();
- action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
- effStart.getTime(), actualTime, lastActionNumber, conf, actionBean);
- int catchUpTOMultiplier = 1; // This value might be could be changed in future
- if (actionBean.getNominalTimestamp().before(jobBean.getCreatedTimestamp())) {
- // Catchup action
- timeout = catchUpTOMultiplier * timeout;
- // actionBean.setTimeOut(Services.get().getConf().getInt(CONF_DEFAULT_TIMEOUT_CATCHUP,
- // -1));
- log.info("Catchup timeout is :" + actionBean.getTimeOut());
- }
- actionBean.setTimeOut(timeout);
-
- if (!dryrun) {
- storeToDB(actionBean, action, store, jobBean.getAppName()); // Storing to table
- }
- else {
- actionStrings.append("action for new instance");
- actionStrings.append(action);
- }
- // Restore the original start time
- effStart = (Calendar) origStart.clone();
- effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
- }
-
- endTime = new Date(effStart.getTimeInMillis());
- if (!dryrun) {
- return action;
- }
- else {
- return actionStrings.toString();
- }
- }
-
- /**
- * Store an Action into database table.
- *
- * @param actionBean
- * @param actionXml
- * @param store
- * @param appName
- * @throws Exception
- */
- private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store, String appName)
- throws Exception {
- log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length());
- actionBean.setActionXml(actionXml);
- insertList.add(actionBean);
- createActionRegistration(actionXml, actionBean, store, appName);
-
- // TODO: time 100s should be configurable
- queueCallable(new CoordActionNotificationXCommand(actionBean), 100);
- queueCallable(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
- }
-
- /**
- * @param actionXml
- * @param actionBean
- * @param store
- * @param appName
- * @throws Exception
- */
- private void createActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store,
- String appName) throws Exception {
- Element eAction = XmlUtils.parseXml(actionXml);
- Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
- SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, store, actionBean.getId(),
- SlaAppType.COORDINATOR_ACTION, user, group);
- if(slaEvent != null) {
- insertList.add(slaEvent);
- }
- // insert into new sla reg table too
- SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
- AppType.COORDINATOR_ACTION, user, appName, log, false);
- }
-
- /**
- * @param job
- * @param store
- * @throws StoreException
- */
- private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) {
- // TODO: why do we need this? Isn't lastMatTime enough???
- job.setLastActionTime(endTime);
- job.setLastActionNumber(lastActionNumber);
- // if the job endtime == action endtime, then set status of job to
- // succeeded
- // we dont need to materialize this job anymore
- Date jobEndTime = job.getEndTime();
- if (jobEndTime.compareTo(endTime) <= 0) {
- job.setStatus(CoordinatorJob.Status.SUCCEEDED);
- log.info("[" + job.getId() + "]: Update status from PREMATER to SUCCEEDED");
- }
- else {
- job.setStatus(CoordinatorJob.Status.RUNNING);
- log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING");
- }
- job.setNextMaterializedTime(endTime);
- updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job));
- }
-
- @Override
- protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
- log.info("STARTED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime + ", endTime="
- + endTime);
- try {
- if (lock(jobId)) {
- call(store);
- JPAService jpaService = Services.get().get(JPAService.class);
- if (jpaService != null) {
- try {
- BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
- }
- catch (JPAExecutorException je) {
- throw new CommandException(je);
- }
- }
- else {
- throw new CommandException(ErrorCode.E0610);
- }
- }
- else {
- queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime),
- LOCK_FAILURE_REQUEUE_INTERVAL);
- log.warn("CoordActionMaterializeCommand lock was not acquired - failed jobId=" + jobId
- + ". Requeing the same.");
- }
- }
- catch (InterruptedException e) {
- queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), LOCK_FAILURE_REQUEUE_INTERVAL);
- log.warn("CoordActionMaterializeCommand lock acquiring failed with exception " + e.getMessage()
- + " for jobId=" + jobId + " Requeing the same.");
- }
- finally {
- log.info(" ENDED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime
- + ", endTime=" + endTime);
- }
- return null;
- }
-
-
-
- /**
- * For preliminery testing. Should be removed soon
- *
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- new Services().init();
- try {
- Date startTime = DateUtils.parseDateUTC("2009-02-01T01:00Z");
- Date endTime = DateUtils.parseDateUTC("2009-02-02T01:00Z");
- String jobId = "0000000-091207151850551-oozie-dani-C";
- CoordActionMaterializeCommand matCmd = new CoordActionMaterializeCommand(jobId, startTime, endTime);
- matCmd.call();
- }
- finally {
- try {
- Thread.sleep(60000);
- }
- catch (Exception ex) {
- }
- new Services().destroy();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 23bafb8..b4b2fef 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -101,6 +101,16 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
this.materializationWindow = materializationWindow;
}
+ public CoordMaterializeTransitionXCommand(CoordinatorJobBean coordJob, int materializationWindow, Date startTime,
+ Date endTime) {
+ super("coord_mater", "coord_mater", 1);
+ this.jobId = ParamChecker.notEmpty(coordJob.getId(), "jobId");
+ this.materializationWindow = materializationWindow;
+ this.coordJob = coordJob;
+ this.startMatdTime = startTime;
+ this.endMatdTime = endTime;
+ }
+
/* (non-Javadoc)
* @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
*/
@@ -412,7 +422,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
}
String action = null;
- int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
+ int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
// If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
boolean ignoreMaxActions =
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index 11fde6f..02b30ef 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -56,6 +56,7 @@ import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CoordMaterializeTriggerService;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.HadoopAccessorService;
@@ -287,14 +288,16 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
/**
* Gets the dryrun output.
*
- * @param jobId the job id
+ * @param coordJob the coordinatorJobBean
* @return the dry run
* @throws Exception the exception
*/
protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{
+ int materializationWindow = conf.getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW,
+ CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW_DEFAULT);
Date startTime = coordJob.getStartTime();
long startTimeMilli = startTime.getTime();
- long endTimeMilli = startTimeMilli + (3600 * 1000);
+ long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
Date jobEndTime = coordJob.getEndTime();
Date endTime = new Date(endTimeMilli);
if (endTime.compareTo(jobEndTime) > 0) {
@@ -304,8 +307,6 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
LOG.info("[" + jobId + "]: Update status to RUNNING");
coordJob.setStatus(Job.Status.RUNNING);
coordJob.setPending();
- CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
- endTime);
Configuration jobConf = null;
try {
jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
@@ -313,7 +314,8 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
catch (IOException e1) {
LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
}
- String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
+ String action = new CoordMaterializeTransitionXCommand(coordJob, materializationWindow, startTime,
+ endTime).materializeActions(true);
String output = coordJob.getJobXml() + System.getProperty("line.separator")
+ "***actions for instance***" + action;
return output;
@@ -323,9 +325,9 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
* Queue MaterializeTransitionXCommand
*/
protected void queueMaterializeTransitionXCommand(String jobId) {
- // submit a command to materialize jobs for the next 1 hour (3600 secs)
- // so we don't wait 10 mins for the Service to run.
- queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
+ int materializationWindow = conf.getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW,
+ CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW_DEFAULT);
+ queue(new CoordMaterializeTransitionXCommand(jobId, materializationWindow), 100);
}
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
deleted file mode 100644
index c70e171..0000000
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command.coord;
-
-import org.apache.oozie.command.Command;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.Store;
-import org.apache.oozie.store.WorkflowStore;
-
-public abstract class CoordinatorCommand<T> extends Command<T, CoordinatorStore> {
-
- public CoordinatorCommand(String name, String type, int priority, int logMask) {
- super(name, type, priority, logMask);
- }
-
- public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore) {
- super(name, type, priority, logMask, withStore);
- }
-
- public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
- super(name, type, priority, logMask, (dryrun) ? false : withStore, dryrun);
- }
-
- /**
- * Return the public interface of the Coordinator Store.
- *
- * @return {@link WorkflowStore}
- */
- public Class<? extends Store> getStoreClass() {
- return CoordinatorStore.class;
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index 1dac7e8..3fbd092 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -61,7 +61,7 @@ public class CoordMaterializeTriggerService implements Service {
private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
- private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
+ public static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/TestCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/TestCommand.java b/core/src/test/java/org/apache/oozie/command/TestCommand.java
deleted file mode 100644
index 60363bf..0000000
--- a/core/src/test/java/org/apache/oozie/command/TestCommand.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.store.WorkflowStore;
-import org.apache.oozie.store.Store;
-import org.apache.oozie.service.DagXLogInfoService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XTestCase;
-import org.apache.oozie.util.XCallable;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.ErrorCode;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-public class TestCommand extends XTestCase {
- private static List<String> EXECUTED = Collections.synchronizedList(new ArrayList<String>());
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- new Services().init();
- }
-
- @Override
- protected void tearDown() throws Exception {
- Services.get().destroy();
- super.tearDown();
- }
-
- private static class DummyXCallable implements XCallable<Void> {
- private String name;
- private String key = null;
-
- public DummyXCallable(String name) {
- this.name = name;
- this.key = name + "_" + UUID.randomUUID();
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public String getType() {
- return "type";
- }
-
- @Override
- public int getPriority() {
- return 0;
- }
-
- @Override
- public long getCreatedTime() {
- return 1;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Type:").append(getType());
- sb.append(",Priority:").append(getPriority());
- return sb.toString();
- }
-
- @Override
- public void setInterruptMode(boolean mode) {
- }
-
- @Override
- public boolean inInterruptMode() {
- return false;
- }
-
- public Void call() throws Exception {
- EXECUTED.add(name);
- return null;
- }
-
- @Override
- public String getKey() {
- return this.key;
- }
-
- @Override
- public String getEntityKey() {
- return null;
- }
-
- }
-
- private static class MyCommand extends Command<Object, WorkflowStore> {
- private boolean exception;
-
- public MyCommand(boolean exception) {
- super("test", "test", 1, XLog.OPS);
- this.exception = exception;
- }
-
- @Override
- protected Object call(WorkflowStore store) throws StoreException, CommandException {
- assertTrue(logInfo.createPrefix().contains("JOB[job]"));
- assertTrue(XLog.Info.get().createPrefix().contains("JOB[job]"));
- assertTrue(logInfo.createPrefix().contains("ACTION[action]"));
- assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]"));
- assertNotNull(store);
- assertEquals("test", getName());
- assertEquals(1, getPriority());
- queueCallable(new DummyXCallable("a"));
- queueCallable(Arrays.asList(new DummyXCallable("b"), new DummyXCallable("c")));
- queueCallable(new DummyXCallable("d"), 300);
- queueCallable(new DummyXCallable("e"), 200);
- queueCallable(new DummyXCallable("f"), 100);
- queueCallableForException(new DummyXCallable("ex"));
- if (exception) {
- throw new CommandException(ErrorCode.E0800);
- }
- return null;
- }
-
- /**
- * Return the public interface of the Workflow Store.
- *
- * @return {@link WorkflowStore}
- */
- @Override
- public Class<? extends Store> getStoreClass() {
- return WorkflowStore.class;
- }
- }
-
- public void testDagCommand() throws Exception {
- XLog.Info.get().clear();
- XLog.Info.get().setParameter(DagXLogInfoService.JOB, "job");
- XLog.Info.get().setParameter(DagXLogInfoService.ACTION, "action");
-
- Command command = new MyCommand(false);
-
- XLog.Info.get().clear();
- command.call();
-
- assertTrue(XLog.Info.get().createPrefix().contains("JOB[job]"));
- assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]"));
- command.resetLogInfoWorkflow();
- assertTrue(XLog.Info.get().createPrefix().contains("JOB[-]"));
- assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]"));
- command.resetLogInfoAction();
- assertTrue(XLog.Info.get().createPrefix().contains("ACTION[-]"));
-
- waitFor(2000, new Predicate() {
- public boolean evaluate() throws Exception {
- return EXECUTED.size() == 6;
- }
- });
-
- assertEquals(6, EXECUTED.size());
- assertEquals(Arrays.asList("a", "b", "c", "d", "e", "f"), EXECUTED);
-
- EXECUTED.clear();
-
- XLog.Info.get().setParameter(DagXLogInfoService.JOB, "job");
- XLog.Info.get().setParameter(DagXLogInfoService.ACTION, "action");
- command = new MyCommand(true);
-
- try {
- command.call();
- fail();
- }
- catch (CommandException ex) {
- //nop
- }
-
- waitFor(200, new Predicate() {
- public boolean evaluate() throws Exception {
- return EXECUTED.size() == 2;
- }
- });
-
- assertEquals(1, EXECUTED.size());
- assertEquals(Arrays.asList("ex"), EXECUTED);
- }
-
- /**
- * Return the public interface of the Workflow Store.
- *
- * @return {@link WorkflowStore}
- */
- public Class<? extends Store> getStoreClass() {
- return WorkflowStore.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java
deleted file mode 100644
index 6278515..0000000
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command.coord;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.SLAEventBean;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.SLAStore;
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.test.XTestCase;
-import org.apache.oozie.util.DateUtils;
-
-@SuppressWarnings("deprecation")
-public class TestCoordActionMaterializeCommand extends XTestCase {
- private Services services;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- services = new Services();
- services.init();
- }
-
- @Override
- protected void tearDown() throws Exception {
- services.destroy();
- super.tearDown();
- }
-
- public void testActionMater() throws Exception {
- String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
- String startTimeStr = "2009-03-06T10:00Z";
- Date startTime = DateUtils.parseDateOozieTZ(startTimeStr);
- Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
- addRecordToJobTable(jobId, startTime, endTime);
- new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
- CoordinatorActionBean action = checkCoordAction(jobId + "@1");
- assertEquals(action.getActionNumber(), 1);
- assertEquals(action.getNominalTime(), startTime);
- assertTrue(action.getMissingDependencies().indexOf("file:///tmp/coord/workflows/2009/03/01") > -1);
- String actionXml = action.getActionXml();
- String slaNotifMsg = actionXml.substring(actionXml.indexOf("<sla:notification-msg>") + 22,
- actionXml.indexOf("</sla:notification-msg>"));
- String expectedSlaMsg = "Notifying User for 2009-03-06T10:00Z,"
- + DateUtils.formatDateOozieTZ(action.getCreatedTime()) + ",2009-03-06,2009-03-07T10:00Z,"
- + action.getId() + ",NAME,testValue,testUser,"
- + "file:///tmp/coord/workflows/2009/22myOutputDatabase,myOutputTable,'datestamp=20090306'";
- assertEquals(expectedSlaMsg, slaNotifMsg);
- }
-
- public void testActionMaterWithPauseTime1() throws Exception {
- String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
-
- Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
- Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
- Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z");
- addRecordToJobTable(jobId, startTime, endTime, pauseTime);
- new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
- checkCoordActions(jobId, 1, null);
- }
-
- public void testActionMaterWithPauseTime2() throws Exception {
- String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
-
- Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
- Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
- Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z");
- addRecordToJobTable(jobId, startTime, endTime, pauseTime);
- new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
- checkCoordActions(jobId, 2, null);
- }
-
- public void testActionMaterWithPauseTime3() throws Exception {
- String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
-
- Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
- Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
- Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T09:58Z");
- addRecordToJobTable(jobId, startTime, endTime, pauseTime);
- new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
- checkCoordActions(jobId, 0, CoordinatorJob.Status.RUNNING);
- }
-
- private void addRecordToJobTable(String jobId, Date startTime, Date endTime) throws StoreException {
- CoordinatorStore store = new CoordinatorStore(false);
- CoordinatorJobBean coordJob = new CoordinatorJobBean();
- coordJob.setId(jobId);
- coordJob.setAppName("testApp");
- coordJob.setStartTime(startTime);
- coordJob.setEndTime(endTime);
- coordJob.setTimeUnit(Timeunit.DAY);
- coordJob.setAppPath("testAppPath");
- coordJob.setStatus(CoordinatorJob.Status.PREMATER);
- coordJob.setCreatedTime(new Date()); // TODO: Do we need that?
- coordJob.setLastModifiedTime(new Date());
- coordJob.setUser("testUser");
- coordJob.setGroup("testGroup");
- coordJob.setTimeZone("America/Los_Angeles");
- String confStr = "<configuration><property><name>testProperty</name><value>testValue</value></property>"
- + "<property><name>user.name</name><value>testUser</value></property></configuration>";
- coordJob.setConf(confStr);
- String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' xmlns:sla='uri:oozie:sla:0.1' name='NAME' frequency=\"1\" start='2009-03-06T010:00Z' end='2009-03-11T10:00Z' timezone='America/Los_Angeles' freq_timeunit='DAY' end_of_duration='NONE'>";
- appXml += "<controls>";
- appXml += "<timeout>10</timeout>";
- appXml += "<concurrency>2</concurrency>";
- appXml += "<execution>LIFO</execution>";
- appXml += "</controls>";
- appXml += "<input-events>";
- appXml += "<data-in name='A' dataset='a'>";
- appXml += "<dataset name='a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
- appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${MONTH}/${DAY}</uri-template>";
- appXml += "</dataset>";
- appXml += "<instance>${coord:current(0)}</instance>";
- appXml += "<instance>${coord:latest(-1)}</instance>";
- appXml += "</data-in>";
- appXml += "</input-events>";
- appXml += "<output-events>";
- appXml += "<data-out name='LOCAL_A' dataset='local_a'>";
- appXml += "<dataset name='local_a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
- appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>";
- appXml += "</dataset>";
- appXml += "<instance>${coord:current(-1)}</instance>";
- appXml += "</data-out>";
- appXml += "<data-out name='aggregated-logs' dataset='Stats'>";
- appXml += "<dataset name='Stats' frequency='1' initial-instance='2009-01-01T01:00Z' ";
- appXml += "timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
- appXml += "<uri-template>hcat://foo:11002/myOutputDatabase/myOutputTable/datestamp=${YEAR}${MONTH}${DAY}";
- appXml += "</uri-template></dataset>";
- appXml += "<instance>${coord:current(0)}</instance>";
- appXml += "</data-out>";
- appXml += "</output-events>";
- appXml += "<action>";
- appXml += "<workflow>";
- appXml += "<app-path>hdfs:///tmp/workflows/</app-path>";
- appXml += "<configuration>";
- appXml += "<property>";
- appXml += "<name>inputA</name>";
- appXml += "<value>${coord:dataIn('A')}</value>";
- appXml += "</property>";
- appXml += "<property>";
- appXml += "<name>inputB</name>";
- appXml += "<value>${coord:dataOut('LOCAL_A')}</value>";
- appXml += "</property>";
- appXml += "</configuration>";
- appXml += "</workflow>";
- appXml += " <sla:info>"
- + " <sla:app-name>test-app</sla:app-name>"
- + " <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>"
- + " <sla:should-start>5</sla:should-start>"
- + " <sla:should-end>120</sla:should-end>"
- + " <sla:notification-msg>Notifying User for ${coord:nominalTime()},${coord:actualTime()},"
- + "${coord:formatTime(coord:nominalTime(),'yyyy-MM-dd')},${coord:dateOffset(coord:nominalTime(), 1, 'DAY')},"
- + "${coord:actionId()},${coord:name()},${coord:conf('testProperty')},${coord:user()},${coord:dataOut('LOCAL_A')}"
- + "${coord:databaseOut('aggregated-logs')},${coord:tableOut('aggregated-logs')},"
- + "${coord:dataOutPartitions('aggregated-logs')}"
- + "</sla:notification-msg>" + " <sla:alert-contact>abc@example.com</sla:alert-contact>"
- + " <sla:dev-contact>abc@example.com</sla:dev-contact>"
- + " <sla:qa-contact>abc@example.com</sla:qa-contact>"
- + " <sla:se-contact>abc@example.com</sla:se-contact>" + "</sla:info>";
- appXml += "</action>";
- appXml += "</coordinator-app>";
- /*try {
- System.out.println(XmlUtils.prettyPrint(XmlUtils.parseXml(appXml)));
- ;
- }
- catch (JDOMException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- }*/
- coordJob.setJobXml(appXml);
- coordJob.setLastActionNumber(0);
- coordJob.setFrequency("1");
- try {
- coordJob.setEndTime(DateUtils.parseDateOozieTZ("2009-03-11T10:00Z"));
- }
- catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- fail("Could not set end time");
- }
- try {
- store.beginTrx();
- store.insertCoordinatorJob(coordJob);
- store.commitTrx();
- }
- catch (StoreException se) {
- se.printStackTrace();
- store.rollbackTrx();
- fail("Unable to insert the test job record to table");
- throw se;
- }
- finally {
- store.closeTrx();
- }
- }
-
- private CoordinatorActionBean checkCoordAction(String actionId) throws StoreException {
- CoordinatorStore store = new CoordinatorStore(false);
- try {
- CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
- SLAStore slaStore = new SLAStore(store);
- long lastSeqId[] = new long[1];
- List<SLAEventBean> slaEvents = slaStore.getSLAEventListNewerSeqLimited(0, 10, lastSeqId);
- // System.out.println("AAA " + slaEvents.size() + " : " +
- // lastSeqId[0]);
- if (slaEvents.size() == 0) {
- fail("Unable to GET any record of sequence id greater than 0");
- }
- return action;
- }
- catch (StoreException se) {
- se.printStackTrace();
- fail("Action ID " + actionId + " was not stored properly in db");
- }
- return null;
- }
-
- private void addRecordToJobTable(String jobId, Date startTime, Date endTime, Date pauseTime) throws StoreException {
- CoordinatorStore store = new CoordinatorStore(false);
- CoordinatorJobBean coordJob = new CoordinatorJobBean();
- coordJob.setId(jobId);
- coordJob.setAppName("testApp");
- coordJob.setStartTime(startTime);
- coordJob.setEndTime(endTime);
- coordJob.setPauseTime(pauseTime);
- coordJob.setTimeUnit(Timeunit.MINUTE);
- coordJob.setAppPath("testAppPath");
- coordJob.setStatus(CoordinatorJob.Status.PREMATER);
- coordJob.setCreatedTime(new Date()); // TODO: Do we need that?
- coordJob.setLastModifiedTime(new Date());
- coordJob.setUser("testUser");
- coordJob.setGroup("testGroup");
- coordJob.setTimeZone("America/Los_Angeles");
- String confStr = "<configuration></configuration>";
- coordJob.setConf(confStr);
- String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' xmlns:sla='uri:oozie:sla:0.1' name='NAME' frequency=\"5\" start='2009-03-06T010:00Z' end='2009-03-06T10:14Z' timezone='America/Los_Angeles' freq_timeunit='MINUTE' end_of_duration='NONE'>";
- appXml += "<controls>";
- appXml += "<timeout>10</timeout>";
- appXml += "<concurrency>2</concurrency>";
- appXml += "<execution>LIFO</execution>";
- appXml += "</controls>";
- appXml += "<action>";
- appXml += "<workflow>";
- appXml += "<app-path>hdfs:///tmp/workflows/</app-path>";
- appXml += "<configuration>";
- appXml += "</configuration>";
- appXml += "</workflow>";
- appXml += " <sla:info>"
- // + " <sla:client-id>axonite-blue</sla:client-id>"
- + " <sla:app-name>test-app</sla:app-name>"
- + " <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>"
- + " <sla:should-start>5</sla:should-start>"
- + " <sla:should-end>120</sla:should-end>"
- + " <sla:notification-msg>Notifying User for ${coord:nominalTime()} nominal time </sla:notification-msg>"
- + " <sla:alert-contact>abc@example.com</sla:alert-contact>"
- + " <sla:dev-contact>abc@example.com</sla:dev-contact>"
- + " <sla:qa-contact>abc@example.com</sla:qa-contact>" + " <sla:se-contact>abc@example.com</sla:se-contact>"
- + "</sla:info>";
- appXml += "</action>";
- appXml += "</coordinator-app>";
-
- coordJob.setJobXml(appXml);
- coordJob.setLastActionNumber(0);
- coordJob.setFrequency("5");
- try {
- store.beginTrx();
- store.insertCoordinatorJob(coordJob);
- store.commitTrx();
- }
- catch (StoreException se) {
- se.printStackTrace();
- store.rollbackTrx();
- fail("Unable to insert the test job record to table");
- throw se;
- }
- finally {
- store.closeTrx();
- }
- }
-
- private void checkCoordActions(String jobId, int number, CoordinatorJob.Status status) throws StoreException {
- CoordinatorStore store = new CoordinatorStore(false);
- try {
- int coordActionsCount = store.getActionsForCoordinatorJob(jobId, false);
- if (coordActionsCount != number) {
- fail("Should have " + number + " actions created for job " + jobId);
- }
-
- if (status != null) {
- CoordinatorJob job = store.getCoordinatorJob(jobId, false);
- if (job.getStatus() != status) {
- fail("Job status " + job.getStatus() + " should be " + status);
- }
- }
- }
- catch (StoreException se) {
- se.printStackTrace();
- fail("Job ID " + jobId + " was not stored properly in db");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
index 7f7a387..aab8efd 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
@@ -55,10 +55,10 @@ public class TestCoordELExtensions extends XDataTestCase {
Date startTime = DateUtils.parseDateUTC("2009-03-06T010:00Z");
Date endTime = DateUtils.parseDateUTC("2009-03-07T12:00Z");
CoordinatorJobBean job = createCoordJob("coord-job-for-elext.xml",
- CoordinatorJob.Status.PREMATER, startTime, endTime, false, false, 0);
+ CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
addRecordToCoordJobTable(job);
- new CoordActionMaterializeCommand(job.getId(), startTime, endTime).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
checkCoordAction(job.getId() + "@1");
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 5b22abc..60eff2f 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -31,7 +31,6 @@ import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
@@ -382,6 +381,21 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
checkCoordActions(job.getId(), 0, CoordinatorJob.Status.PAUSED);
}
+ public void testGetDryrun() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
+ CoordinatorJobBean job = createCoordJob(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
+ job.setFrequency("5");
+ job.setTimeUnit(Timeunit.MINUTE);
+ job.setMatThrottling(20);
+ String dryRunOutput = new CoordMaterializeTransitionXCommand(job, 3600, startTime, endTime).materializeActions(true);
+ String[] actions = dryRunOutput.split("action for new instance");
+ assertEquals(3, actions.length -1);
+ for(int i = 1; i < actions.length; i++) {
+ assertTrue(actions[i].contains("action-nominal-time"));
+ }
+ }
+
public void testTimeout() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 190bad7..be5e84a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -836,7 +836,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.12</version>
+ <version>2.12.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 9966933..6ec2bcb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,4 +1,5 @@
-- Oozie 4.2.0 release (trunk - unreleased)
+OOZIE-1846 Convert CoordActionMaterializeCommand to an XCommand and remove Command (seoeun25 via shwethags)
OOZIE-1925 upgrade tomcat to 6.0.41 (rkanter via shwethags)
OOZIE-1943 Bump up trunk to 4.2.0-SNAPSHOT (bzhang)