You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by sh...@apache.org on 2014/07/24 09:03:20 UTC

git commit: OOZIE-1846 Convert CoordActionMaterializeCommand to an XCommand and remove Command (seoeun25 via shwethags)

Repository: oozie
Updated Branches:
  refs/heads/master 12ef61470 -> 313652559


OOZIE-1846 Convert CoordActionMaterializeCommand to an XCommand and remove Command (seoeun25 via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/31365255
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/31365255
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/31365255

Branch: refs/heads/master
Commit: 313652559bc43f3c89c1f76c4d00c0529314213c
Parents: 12ef614
Author: Shwetha GS <sh...@inmobi.com>
Authored: Thu Jul 24 12:04:01 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Thu Jul 24 12:06:58 2014 +0530

----------------------------------------------------------------------
 .../java/org/apache/oozie/command/Command.java  | 594 -------------------
 .../apache/oozie/command/CommandException.java  |   2 +-
 .../coord/CoordActionMaterializeCommand.java    | 375 ------------
 .../CoordMaterializeTransitionXCommand.java     |  12 +-
 .../command/coord/CoordSubmitXCommand.java      |  18 +-
 .../oozie/command/coord/CoordinatorCommand.java |  47 --
 .../service/CoordMaterializeTriggerService.java |   2 +-
 .../org/apache/oozie/command/TestCommand.java   | 214 -------
 .../TestCoordActionMaterializeCommand.java      | 323 ----------
 .../command/coord/TestCoordELExtensions.java    |   4 +-
 .../TestCoordMaterializeTransitionXCommand.java |  16 +-
 pom.xml                                         |   2 +-
 release-log.txt                                 |   1 +
 13 files changed, 42 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/Command.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/Command.java b/core/src/main/java/org/apache/oozie/command/Command.java
deleted file mode 100644
index 0e51b8e..0000000
--- a/core/src/main/java/org/apache/oozie/command/Command.java
+++ /dev/null
@@ -1,594 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.FaultInjection;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.XException;
-import org.apache.oozie.service.CallableQueueService;
-import org.apache.oozie.service.DagXLogInfoService;
-import org.apache.oozie.service.InstrumentationService;
-import org.apache.oozie.service.MemoryLocksService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.StoreService;
-import org.apache.oozie.service.XLogService;
-import org.apache.oozie.store.Store;
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.store.WorkflowStore;
-import org.apache.oozie.util.InstrumentUtils;
-import org.apache.oozie.util.Instrumentation;
-import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.util.XCallable;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.lock.LockToken;
-
-/**
- * Base class for all synchronous and asynchronous DagEngine commands.
- */
-public abstract class Command<T, S extends Store> implements XCallable<T> {
-    /**
-     * The instrumentation group used for Commands.
-     */
-    private static final String INSTRUMENTATION_GROUP = "commands";
-
-    private final long createdTime;
-
-    /**
-     * The instrumentation group used for Jobs.
-     */
-    private static final String INSTRUMENTATION_JOB_GROUP = "jobs";
-
-    private static final long LOCK_TIMEOUT = 1000;
-    protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;
-
-    protected Instrumentation instrumentation;
-    private List<XCallable<Void>> callables;
-    private List<XCallable<Void>> delayedCallables;
-    private long delay = 0;
-    private List<XCallable<Void>> exceptionCallables;
-    private String name;
-    private String type;
-    private String key;
-    private int priority;
-    private int logMask;
-    private boolean withStore;
-    protected boolean dryrun = false;
-    private ArrayList<LockToken> locks = null;
-
-    /**
-     * This variable is package private for testing purposes only.
-     */
-    XLog.Info logInfo;
-
-    /**
-     * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
-     * captured for execution.
-     *
-     * @param name command name.
-     * @param type command type.
-     * @param priority priority of the command, used when queuing for asynchronous execution.
-     * @param logMask log mask for the command logging calls.
-     */
-    public Command(String name, String type, int priority, int logMask) {
-        this(name, type, priority, logMask, true);
-    }
-
-    /**
-     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
-     *
-     * @param name command name.
-     * @param type command type.
-     * @param priority priority of the command, used when queuing for asynchronous execution.
-     * @param logMask log mask for the command logging calls.
-     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
-     */
-    public Command(String name, String type, int priority, int logMask, boolean withStore) {
-        this.name = ParamChecker.notEmpty(name, "name");
-        this.type = ParamChecker.notEmpty(type, "type");
-        this.key = name + "_" + UUID.randomUUID();
-        this.priority = priority;
-        this.withStore = withStore;
-        this.logMask = logMask;
-        instrumentation = Services.get().get(InstrumentationService.class).get();
-        logInfo = new XLog.Info(XLog.Info.get());
-        createdTime = System.currentTimeMillis();
-        locks = new ArrayList<LockToken>();
-    }
-
-    /**
-     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
-     *
-     * @param name command name.
-     * @param type command type.
-     * @param priority priority of the command, used when queuing for asynchronous execution.
-     * @param logMask log mask for the command logging calls.
-     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
-     * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
-     * really submitting the job
-     */
-    public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
-        this(name, type, priority, logMask, withStore);
-        this.dryrun = dryrun;
-    }
-
-    /**
-     * Return the name of the command.
-     *
-     * @return the name of the command.
-     */
-    @Override
-    public String getName() {
-        return name;
-    }
-
-    /**
-     * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
-     * org.apache.oozie.service.CallableQueueService}.
-     *
-     * @return the callable type.
-     */
-    @Override
-    public String getType() {
-        return type;
-    }
-
-    /**
-     * Return the priority of the command.
-     *
-     * @return the priority of the command.
-     */
-    @Override
-    public int getPriority() {
-        return priority;
-    }
-
-    /**
-     * Returns the createdTime of the callable in milliseconds
-     *
-     * @return the callable createdTime
-     */
-    @Override
-    public long getCreatedTime() {
-        return createdTime;
-    }
-
-    /**
-     * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
-     * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
-     * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
-     * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
-     * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
-     * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
-     * commands queued for exception will be effectively queued fro execution..
-     *
-     * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
-     * without committing, thus doing a rollback.
-     */
-    @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
-    public final T call() throws CommandException {
-        XLog.Info.get().setParameters(logInfo);
-        XLog log = XLog.getLog(getClass());
-        log.trace(logMask, "Start");
-        Instrumentation.Cron cron = new Instrumentation.Cron();
-        cron.start();
-        callables = new ArrayList<XCallable<Void>>();
-        delayedCallables = new ArrayList<XCallable<Void>>();
-        exceptionCallables = new ArrayList<XCallable<Void>>();
-        delay = 0;
-        S store = null;
-        boolean exception = false;
-
-        try {
-            if (withStore) {
-                store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
-                store.beginTrx();
-            }
-            T result = execute(store);
-            /*
-             *
-             * if (store != null && log != null) { log.info(XLog.STD,
-             * "connection log from store Flush Mode {0} ",
-             * store.getFlushMode()); }
-             */
-            if (withStore) {
-                if (store == null) {
-                    throw new IllegalStateException("WorkflowStore should not be null");
-                }
-                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
-                    throw new RuntimeException("Skipping Commit for Failover Testing");
-                }
-                store.commitTrx();
-            }
-
-            // TODO figure out the reject due to concurrency problems and remove
-            // the delayed queuing for callables.
-            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
-            if (ret == false) {
-                logQueueCallableFalse(callables);
-            }
-
-            ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
-            if (ret == false) {
-                logQueueCallableFalse(delayedCallables);
-            }
-
-            return result;
-        }
-        catch (XException ex) {
-            log.error(logMask | XLog.OPS, "XException, {0}", ex.getMessage(), ex);
-            if (store != null) {
-                log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
-                        .isClosed());
-            }
-            exception = true;
-            if (store != null && store.isActive()) {
-                try {
-                    store.rollbackTrx();
-                }
-                catch (RuntimeException rex) {
-                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
-                }
-            }
-
-            // TODO figure out the reject due to concurrency problems and remove
-            // the delayed queuing for callables.
-            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
-            if (ret == false) {
-                logQueueCallableFalse(exceptionCallables);
-            }
-            if (ex instanceof CommandException) {
-                throw (CommandException) ex;
-            }
-            else {
-                throw new CommandException(ex);
-            }
-        }
-        catch (Exception ex) {
-            log.error(logMask | XLog.OPS, "Exception, {0}", ex.getMessage(), ex);
-            exception = true;
-            if (store != null && store.isActive()) {
-                try {
-                    store.rollbackTrx();
-                }
-                catch (RuntimeException rex) {
-                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
-                }
-            }
-            throw new CommandException(ErrorCode.E0607, name, ex.getMessage(), ex);
-        }
-        catch (Error er) {
-            log.error(logMask | XLog.OPS, "Error, {0}", er.getMessage(), er);
-            exception = true;
-            if (store != null && store.isActive()) {
-                try {
-                    store.rollbackTrx();
-                }
-                catch (RuntimeException rex) {
-                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
-                }
-            }
-            throw er;
-        }
-        finally {
-            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
-            cron.stop();
-            instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
-            InstrumentUtils.incrCommandCounter(name, 1, instrumentation);
-            log.trace(logMask, "End");
-            if (locks != null) {
-                for (LockToken lock : locks) {
-                    lock.release();
-                }
-                locks.clear();
-            }
-            if (store != null) {
-                if (!store.isActive()) {
-                    try {
-                        store.closeTrx();
-                    }
-                    catch (RuntimeException rex) {
-                        if (exception) {
-                            log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
-                        }
-                        else {
-                            throw rex;
-                        }
-                    }
-                }
-                else {
-                    log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
-                }
-            }
-        }
-    }
-
-    /**
-     * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
-     * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
-     * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
-     * are not queued for execution.
-     *
-     * @param callable callable to queue for execution.
-     */
-    protected void queueCallable(XCallable<Void> callable) {
-        callables.add(callable);
-    }
-
-    /**
-     * Queue a list of callables for execution after the current callable call invocation completes and the {@link
-     * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
-     * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
-     * discarded, they are not queued for execution.
-     *
-     * @param callables list of callables to queue for execution.
-     */
-    protected void queueCallable(List<? extends XCallable<Void>> callables) {
-        this.callables.addAll(callables);
-    }
-
-    /**
-     * Queue a callable for delayed execution after the current callable call invocation completes and the {@link
-     * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
-     * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
-     * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
-     * execution.
-     *
-     * @param callable callable to queue for delayed execution.
-     * @param delay the queue delay in milliseconds
-     */
-    protected void queueCallable(XCallable<Void> callable, long delay) {
-        this.delayedCallables.add(callable);
-        this.delay = Math.max(this.delay, delay);
-    }
-
-    /**
-     * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
-     * an exception does not happen, all the callables queued by this method are discarded, they are not queued for
-     * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
-     * serial execution.
-     *
-     * @param callable callable to queue for execution in the case of an exception.
-     */
-    protected void queueCallableForException(XCallable<Void> callable) {
-        exceptionCallables.add(callable);
-    }
-
-    /**
-     * Logging the info if failed to queue the callables.
-     *
-     * @param callables
-     */
-    protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
-        StringBuilder sb = new StringBuilder(
-                "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
-        int size = callables.size();
-        for (int i = 0; i < size; i++) {
-            XCallable<Void> callable = callables.get(i);
-            sb.append(callable.getName());
-            if (i < size - 1) {
-                sb.append(", ");
-            }
-            else {
-                sb.append("]");
-            }
-        }
-        XLog.getLog(getClass()).warn(sb.toString());
-    }
-
-    /**
-     * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
-     * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
-     * is rolledback.
-     *
-     * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
-     * store.
-     * @return the return value of the callable.
-     * @throws StoreException thrown if the workflow store could not perform an operation.
-     * @throws CommandException thrown if the command could not perform its operation.
-     */
-    protected abstract T call(S store) throws StoreException, CommandException;
-
-    // to do
-    // need to implement on all sub commands and break down the transactions
-
-    // protected abstract T execute(String id) throws CommandException;
-
-    /**
-     * Command subclasses must implement this method correct Store can be passed to call(store);
-     *
-     * @return the Store class for use by Callable
-     * @throws CommandException thrown if the command could not perform its operation.
-     */
-    protected abstract Class<? extends Store> getStoreClass();
-
-    /**
-     * Set the log info with the context of the given coordinator bean.
-     *
-     * @param cBean coordinator bean.
-     */
-    protected void setLogInfo(CoordinatorJobBean cBean) {
-        if (logInfo.getParameter(XLogService.GROUP) == null) {
-            logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
-        }
-        if (logInfo.getParameter(XLogService.USER) == null) {
-            logInfo.setParameter(XLogService.USER, cBean.getUser());
-        }
-        logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
-        logInfo.setParameter(DagXLogInfoService.TOKEN, "");
-        logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
-        XLog.Info.get().setParameters(logInfo);
-    }
-
-    /**
-     * Set the log info with the context of the given coordinator action bean.
-     *
-     * @param action action bean.
-     */
-    protected void setLogInfo(CoordinatorActionBean action) {
-        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
-        // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
-        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
-        XLog.Info.get().setParameters(logInfo);
-    }
-
-    /**
-     * Set the log info with the context of the given workflow bean.
-     *
-     * @param workflow workflow bean.
-     */
-    protected void setLogInfo(WorkflowJobBean workflow) {
-        if (logInfo.getParameter(XLogService.GROUP) == null) {
-            logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
-        }
-        if (logInfo.getParameter(XLogService.USER) == null) {
-            logInfo.setParameter(XLogService.USER, workflow.getUser());
-        }
-        logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
-        logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
-        logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
-        XLog.Info.get().setParameters(logInfo);
-    }
-
-    /**
-     * Set the log info with the context of the given action bean.
-     *
-     * @param action action bean.
-     */
-    protected void setLogInfo(WorkflowActionBean action) {
-        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
-        logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
-        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
-        XLog.Info.get().setParameters(logInfo);
-    }
-
-    /**
-     * Reset the action bean information from the log info.
-     */
-    // TODO check if they are used, else delete
-    protected void resetLogInfoAction() {
-        logInfo.clearParameter(DagXLogInfoService.ACTION);
-        XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
-    }
-
-    /**
-     * Reset the workflow bean information from the log info.
-     */
-    // TODO check if they are used, else delete
-    protected void resetLogInfoWorkflow() {
-        logInfo.clearParameter(DagXLogInfoService.JOB);
-        logInfo.clearParameter(DagXLogInfoService.APP);
-        logInfo.clearParameter(DagXLogInfoService.TOKEN);
-        XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
-        XLog.Info.get().clearParameter(DagXLogInfoService.APP);
-        XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
-    }
-
-    /**
-     * Return the {@link Instrumentation} instance in use.
-     *
-     * @return the {@link Instrumentation} instance in use.
-     */
-    protected Instrumentation getInstrumentation() {
-        return instrumentation;
-    }
-
-    /**
-     * Return the identity.
-     *
-     * @return the identity.
-     */
-    @Override
-    public String toString() {
-        StringBuilder sb = new StringBuilder();
-        sb.append(getType());
-        sb.append(",").append(getPriority());
-        return sb.toString();
-    }
-
-    protected boolean lock(String id) throws InterruptedException {
-        if (id == null || id.length() == 0) {
-            XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
-            return false;
-        }
-        LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
-        if (token != null) {
-            locks.add(token);
-            return true;
-        }
-        else {
-            return false;
-        }
-    }
-
-    /*
-     * TODO - remove store coupling to EM. Store will only contain queries
-     * protected EntityManager getEntityManager() { return
-     * store.getEntityManager(); }
-     */
-    protected T execute(S store) throws CommandException, StoreException {
-        T result = call(store);
-        return result;
-    }
-
-    /**
-     * Get command key
-     *
-     * @return command key
-     */
-    @Override
-    public String getKey(){
-        return this.key;
-    }
-
-    /**
-     * Get command lock key returning the key as an entity key, [not used] Just
-     * to be able to implement XCallable [to be deprecated]
-     *
-     * @return key
-     */
-    @Override
-    public String getEntityKey() {
-        return this.key;
-    }
-
-    /**
-     * set the mode of execution for the callable. True if in interrupt, false
-     * if not [to be deprecated]
-     */
-    public void setInterruptMode(boolean mode) {
-    }
-
-    /**
-     * [to be deprecated]
-     *
-     * @return the mode of execution. true if it is executed as an Interrupt,
-     *         false otherwise
-     */
-    public boolean inInterruptMode() {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/CommandException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/CommandException.java b/core/src/main/java/org/apache/oozie/command/CommandException.java
index ce5ef54..3740bc8 100644
--- a/core/src/main/java/org/apache/oozie/command/CommandException.java
+++ b/core/src/main/java/org/apache/oozie/command/CommandException.java
@@ -21,7 +21,7 @@ import org.apache.oozie.XException;
 import org.apache.oozie.ErrorCode;
 
 /**
- * Exception thrown by {@link Command}s.
+ * Exception thrown by {@link XCommand}s.
  */
 public class CommandException extends XException {
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
deleted file mode 100644
index 14dee97..0000000
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionMaterializeCommand.java
+++ /dev/null
@@ -1,375 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command.coord;
-
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.AppType;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.ErrorCode;
-import org.apache.oozie.SLAEventBean;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.SLAEvent.SlaAppType;
-import org.apache.oozie.client.rest.JsonBean;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.Service;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.Instrumentation;
-import org.apache.oozie.sla.SLAOperations;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.db.SLADbOperations;
-import org.jdom.Element;
-
-@SuppressWarnings("deprecation")
-public class CoordActionMaterializeCommand extends CoordinatorCommand<Void> {
-    private String jobId;
-    private Date startTime;
-    private Date endTime;
-    private int lastActionNumber = 1; // over-ride by DB value
-    private final XLog log = XLog.getLog(getClass());
-    private String user;
-    private String group;
-    private List<JsonBean> insertList = new ArrayList<JsonBean>();
-    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
-
-    /**
-     * Default timeout for catchup jobs, in minutes, after which coordinator input check will timeout
-     */
-    public static final String CONF_DEFAULT_TIMEOUT_CATCHUP = Service.CONF_PREFIX + "coord.catchup.default.timeout";
-
-    public CoordActionMaterializeCommand(String jobId, Date startTime, Date endTime) {
-        super("coord_action_mater", "coord_action_mater", 1, XLog.STD, false);
-        this.jobId = jobId;
-        this.startTime = startTime;
-        this.endTime = endTime;
-    }
-
-    @Override
-    protected Void call(CoordinatorStore store) throws CommandException {
-        CoordJobGetJPAExecutor getCoordJob = new CoordJobGetJPAExecutor(jobId);
-        CoordinatorJobBean job;
-        try {
-            job = Services.get().get(JPAService.class).execute(getCoordJob);
-        }
-        catch (JPAExecutorException jex) {
-            throw new CommandException(jex);
-        }
-        setLogInfo(job);
-        if (job.getLastActionTime() != null && job.getLastActionTime().compareTo(endTime) >= 0) {
-            log.info("ENDED Coordinator materialization for jobId = " + jobId
-                    + " Action is *already* materialized for Materialization start time = " + startTime + " : Materialization end time = " + endTime + " Job status = " + job.getStatusStr());
-            return null;
-        }
-
-        if (endTime.after(job.getEndTime())) {
-            log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization end time = " + endTime
-                    + " surpasses coordinator job's end time = " + job.getEndTime() + " Job status = " + job.getStatusStr());
-            return null;
-        }
-
-        if (job.getPauseTime() != null && !startTime.before(job.getPauseTime())) {
-            log.info("ENDED Coordinator materialization for jobId = " + jobId + " Materialization start time = " + startTime
-                    + " is after or equal to coordinator job's pause time = " + job.getPauseTime() + " Job status = " + job.getStatusStr());
-            // pausetime blocks real materialization - we change job's status back to RUNNING;
-            if (job.getStatus() == CoordinatorJob.Status.PREMATER) {
-                job.setStatus(CoordinatorJob.Status.RUNNING);
-            }
-            updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_STATUS, job));
-            return null;
-        }
-
-        this.user = job.getUser();
-        this.group = job.getGroup();
-
-        if (job.getStatus().equals(CoordinatorJobBean.Status.PREMATER)) {
-            Configuration jobConf = null;
-            log.debug("start job :" + jobId + " Materialization ");
-            try {
-                jobConf = new XConfiguration(new StringReader(job.getConf()));
-            }
-            catch (IOException ioe) {
-                log.warn("Configuration parse error. read from DB :" + job.getConf(), ioe);
-                throw new CommandException(ErrorCode.E1005, ioe.getMessage(), ioe);
-            }
-
-            try {
-                materializeJobs(false, job, jobConf, store);
-                updateJobTable(job, store);
-            }
-            catch (CommandException ex) {
-                log.warn("Exception occurs:" + ex + " Making the job failed ");
-                job.setStatus(CoordinatorJobBean.Status.FAILED);
-                updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job));
-            }
-            catch (Exception e) {
-                log.error("Excepion thrown :", e);
-                throw new CommandException(ErrorCode.E1001, e.getMessage(), e);
-            }
-        }
-        else {
-            log.info("WARN: action is not in PREMATER state!  It's in state=" + job.getStatus());
-        }
-        return null;
-    }
-
-    /**
-     * Create action instances starting from "start-time" to end-time" and store them into Action table.
-     *
-     * @param dryrun
-     * @param jobBean
-     * @param conf
-     * @param store
-     * @throws Exception
-     */
-    protected String materializeJobs(boolean dryrun, CoordinatorJobBean jobBean, Configuration conf,
-                                     CoordinatorStore store) throws Exception {
-        String jobXml = jobBean.getJobXml();
-        Element eJob = XmlUtils.parseXml(jobXml);
-        // TODO: always UTC?
-        TimeZone appTz = DateUtils.getTimeZone(jobBean.getTimeZone());
-        // TimeZone appTz = DateUtils.getTimeZone("UTC");
-        int frequency = Integer.valueOf(jobBean.getFrequency());
-        TimeUnit freqTU = TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
-        TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
-        Calendar start = Calendar.getInstance(appTz);
-        start.setTime(startTime);
-        DateUtils.moveToEnd(start, endOfFlag);
-        Calendar end = Calendar.getInstance(appTz);
-        end.setTime(endTime);
-        lastActionNumber = jobBean.getLastActionNumber();
-        // DateUtils.moveToEnd(end, endOfFlag);
-        log.info("   *** materialize Actions for tz=" + appTz.getDisplayName() + ",\n start=" + start.getTime()
-                + ", end=" + end.getTime() + "\n TimeUNIT " + freqTU.getCalendarUnit() + " Frequency :" + frequency
-                + ":" + freqTU + " lastActionNumber " + lastActionNumber);
-        // Keep the actual start time
-        Calendar origStart = Calendar.getInstance(appTz);
-        origStart.setTime(jobBean.getStartTimestamp());
-        // Move to the End of duration, if needed.
-        DateUtils.moveToEnd(origStart, endOfFlag);
-        // Cloning the start time to be used in loop iteration
-        Calendar effStart = (Calendar) origStart.clone();
-        // Move the time when the previous action finished
-        effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
-
-        String action = null;
-        StringBuilder actionStrings = new StringBuilder();
-        Date jobPauseTime = jobBean.getPauseTime();
-        Calendar pause = null;
-        if (jobPauseTime != null) {
-            pause = Calendar.getInstance(appTz);
-            pause.setTime(DateUtils.convertDateToTimestamp(jobPauseTime));
-        }
-
-        while (effStart.compareTo(end) < 0) {
-            if (pause != null && effStart.compareTo(pause) >= 0) {
-                break;
-            }
-            CoordinatorActionBean actionBean = new CoordinatorActionBean();
-            lastActionNumber++;
-
-            int timeout = jobBean.getTimeout();
-            log.debug(origStart.getTime() + " Materializing action for time=" + effStart.getTime()
-                    + ", lastactionnumber=" + lastActionNumber);
-            Date actualTime = new Date();
-            action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
-                    effStart.getTime(), actualTime, lastActionNumber, conf, actionBean);
-            int catchUpTOMultiplier = 1; // This value might be could be changed in future
-            if (actionBean.getNominalTimestamp().before(jobBean.getCreatedTimestamp())) {
-                // Catchup action
-                timeout = catchUpTOMultiplier * timeout;
-                // actionBean.setTimeOut(Services.get().getConf().getInt(CONF_DEFAULT_TIMEOUT_CATCHUP,
-                // -1));
-                log.info("Catchup timeout is :" + actionBean.getTimeOut());
-            }
-            actionBean.setTimeOut(timeout);
-
-            if (!dryrun) {
-                storeToDB(actionBean, action, store, jobBean.getAppName()); // Storing to table
-            }
-            else {
-                actionStrings.append("action for new instance");
-                actionStrings.append(action);
-            }
-            // Restore the original start time
-            effStart = (Calendar) origStart.clone();
-            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
-        }
-
-        endTime = new Date(effStart.getTimeInMillis());
-        if (!dryrun) {
-            return action;
-        }
-        else {
-            return actionStrings.toString();
-        }
-    }
-
-    /**
-     * Store an Action into database table.
-     *
-     * @param actionBean
-     * @param actionXml
-     * @param store
-     * @param appName
-     * @throws Exception
-     */
-    private void storeToDB(CoordinatorActionBean actionBean, String actionXml, CoordinatorStore store, String appName)
-            throws Exception {
-        log.debug("In storeToDB() action Id " + actionBean.getId() + " Size of actionXml " + actionXml.length());
-        actionBean.setActionXml(actionXml);
-        insertList.add(actionBean);
-        createActionRegistration(actionXml, actionBean, store, appName);
-
-        // TODO: time 100s should be configurable
-        queueCallable(new CoordActionNotificationXCommand(actionBean), 100);
-        queueCallable(new CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId()), 100);
-    }
-
-    /**
-     * @param actionXml
-     * @param actionBean
-     * @param store
-     * @param appName
-     * @throws Exception
-     */
-    private void createActionRegistration(String actionXml, CoordinatorActionBean actionBean, CoordinatorStore store,
-            String appName) throws Exception {
-        Element eAction = XmlUtils.parseXml(actionXml);
-        Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla"));
-        SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, store, actionBean.getId(),
-                SlaAppType.COORDINATOR_ACTION, user, group);
-        if(slaEvent != null) {
-            insertList.add(slaEvent);
-        }
-        // insert into new sla reg table too
-        SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(),
-                AppType.COORDINATOR_ACTION, user, appName, log, false);
-    }
-
-    /**
-     * @param job
-     * @param store
-     * @throws StoreException
-     */
-    private void updateJobTable(CoordinatorJobBean job, CoordinatorStore store) {
-        // TODO: why do we need this? Isn't lastMatTime enough???
-        job.setLastActionTime(endTime);
-        job.setLastActionNumber(lastActionNumber);
-        // if the job endtime == action endtime, then set status of job to
-        // succeeded
-        // we dont need to materialize this job anymore
-        Date jobEndTime = job.getEndTime();
-        if (jobEndTime.compareTo(endTime) <= 0) {
-            job.setStatus(CoordinatorJob.Status.SUCCEEDED);
-            log.info("[" + job.getId() + "]: Update status from PREMATER to SUCCEEDED");
-        }
-        else {
-            job.setStatus(CoordinatorJob.Status.RUNNING);
-            log.info("[" + job.getId() + "]: Update status from PREMATER to RUNNING");
-        }
-        job.setNextMaterializedTime(endTime);
-        updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE, job));
-    }
-
-    @Override
-    protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
-        log.info("STARTED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime + ", endTime="
-                + endTime);
-        try {
-            if (lock(jobId)) {
-                call(store);
-                JPAService jpaService = Services.get().get(JPAService.class);
-                if (jpaService != null) {
-                    try {
-                        BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
-                    }
-                    catch (JPAExecutorException je) {
-                        throw new CommandException(je);
-                    }
-                }
-                else {
-                    throw new CommandException(ErrorCode.E0610);
-                }
-            }
-            else {
-                queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime),
-                        LOCK_FAILURE_REQUEUE_INTERVAL);
-                log.warn("CoordActionMaterializeCommand lock was not acquired - failed jobId=" + jobId
-                        + ". Requeing the same.");
-            }
-        }
-        catch (InterruptedException e) {
-            queueCallable(new CoordActionMaterializeCommand(jobId, startTime, endTime), LOCK_FAILURE_REQUEUE_INTERVAL);
-            log.warn("CoordActionMaterializeCommand lock acquiring failed with exception " + e.getMessage()
-                    + " for jobId=" + jobId + " Requeing the same.");
-        }
-        finally {
-            log.info(" ENDED CoordActionMaterializeCommand for jobId=" + jobId + ", startTime=" + startTime
-                    + ", endTime=" + endTime);
-        }
-        return null;
-    }
-
-
-
-    /**
-     * For preliminery testing. Should be removed soon
-     *
-     * @param args
-     * @throws Exception
-     */
-    public static void main(String[] args) throws Exception {
-        new Services().init();
-        try {
-            Date startTime = DateUtils.parseDateUTC("2009-02-01T01:00Z");
-            Date endTime = DateUtils.parseDateUTC("2009-02-02T01:00Z");
-            String jobId = "0000000-091207151850551-oozie-dani-C";
-            CoordActionMaterializeCommand matCmd = new CoordActionMaterializeCommand(jobId, startTime, endTime);
-            matCmd.call();
-        }
-        finally {
-            try {
-                Thread.sleep(60000);
-            }
-            catch (Exception ex) {
-            }
-            new Services().destroy();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 23bafb8..b4b2fef 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -101,6 +101,16 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         this.materializationWindow = materializationWindow;
     }
 
+    public CoordMaterializeTransitionXCommand(CoordinatorJobBean coordJob, int materializationWindow, Date startTime,
+                                              Date endTime) {
+        super("coord_mater", "coord_mater", 1);
+        this.jobId = ParamChecker.notEmpty(coordJob.getId(), "jobId");
+        this.materializationWindow = materializationWindow;
+        this.coordJob = coordJob;
+        this.startMatdTime = startTime;
+        this.endMatdTime = endTime;
+    }
+
     /* (non-Javadoc)
      * @see org.apache.oozie.command.MaterializeTransitionXCommand#transitToNext()
      */
@@ -412,7 +422,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         }
 
         String action = null;
-        int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
+        int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
         int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
         // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
         boolean ignoreMaxActions =

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index 11fde6f..02b30ef 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -56,6 +56,7 @@ import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.TimeUnit;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CoordMaterializeTriggerService;
 import org.apache.oozie.service.DagXLogInfoService;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -287,14 +288,16 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
     /**
      * Gets the dryrun output.
      *
-     * @param jobId the job id
+     * @param coordJob the coordinatorJobBean
      * @return the dry run
      * @throws Exception the exception
      */
     protected String getDryRun(CoordinatorJobBean coordJob) throws Exception{
+        int materializationWindow = conf.getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW,
+                CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW_DEFAULT);
         Date startTime = coordJob.getStartTime();
         long startTimeMilli = startTime.getTime();
-        long endTimeMilli = startTimeMilli + (3600 * 1000);
+        long endTimeMilli = startTimeMilli + (materializationWindow * 1000);
         Date jobEndTime = coordJob.getEndTime();
         Date endTime = new Date(endTimeMilli);
         if (endTime.compareTo(jobEndTime) > 0) {
@@ -304,8 +307,6 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
         LOG.info("[" + jobId + "]: Update status to RUNNING");
         coordJob.setStatus(Job.Status.RUNNING);
         coordJob.setPending();
-        CoordActionMaterializeCommand coordActionMatCom = new CoordActionMaterializeCommand(jobId, startTime,
-                endTime);
         Configuration jobConf = null;
         try {
             jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
@@ -313,7 +314,8 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
         catch (IOException e1) {
             LOG.warn("Configuration parse error. read from DB :" + coordJob.getConf(), e1);
         }
-        String action = coordActionMatCom.materializeJobs(true, coordJob, jobConf, null);
+        String action = new CoordMaterializeTransitionXCommand(coordJob, materializationWindow, startTime,
+                endTime).materializeActions(true);
         String output = coordJob.getJobXml() + System.getProperty("line.separator")
         + "***actions for instance***" + action;
         return output;
@@ -323,9 +325,9 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
      * Queue MaterializeTransitionXCommand
      */
     protected void queueMaterializeTransitionXCommand(String jobId) {
-        // submit a command to materialize jobs for the next 1 hour (3600 secs)
-        // so we don't wait 10 mins for the Service to run.
-        queue(new CoordMaterializeTransitionXCommand(jobId, 3600), 100);
+        int materializationWindow = conf.getInt(CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW,
+                CoordMaterializeTriggerService.CONF_MATERIALIZATION_WINDOW_DEFAULT);
+        queue(new CoordMaterializeTransitionXCommand(jobId, materializationWindow), 100);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
deleted file mode 100644
index c70e171..0000000
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordinatorCommand.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command.coord;
-
-import org.apache.oozie.command.Command;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.Store;
-import org.apache.oozie.store.WorkflowStore;
-
-public abstract class CoordinatorCommand<T> extends Command<T, CoordinatorStore> {
-
-    public CoordinatorCommand(String name, String type, int priority, int logMask) {
-        super(name, type, priority, logMask);
-    }
-
-    public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore) {
-        super(name, type, priority, logMask, withStore);
-    }
-
-    public CoordinatorCommand(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
-        super(name, type, priority, logMask, (dryrun) ? false : withStore, dryrun);
-    }
-
-    /**
-     * Return the public interface of the Coordinator Store.
-     *
-     * @return {@link WorkflowStore}
-     */
-    public Class<? extends Store> getStoreClass() {
-        return CoordinatorStore.class;
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index 1dac7e8..3fbd092 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -61,7 +61,7 @@ public class CoordMaterializeTriggerService implements Service {
     private static final String INSTRUMENTATION_GROUP = "coord_job_mat";
     private static final String INSTR_MAT_JOBS_COUNTER = "jobs";
     public static final int CONF_LOOKUP_INTERVAL_DEFAULT = 300;
-    private static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
+    public static final int CONF_MATERIALIZATION_WINDOW_DEFAULT = 3600;
     private static final int CONF_MATERIALIZATION_SYSTEM_LIMIT_DEFAULT = 50;
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/TestCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/TestCommand.java b/core/src/test/java/org/apache/oozie/command/TestCommand.java
deleted file mode 100644
index 60363bf..0000000
--- a/core/src/test/java/org/apache/oozie/command/TestCommand.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command;
-
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.store.WorkflowStore;
-import org.apache.oozie.store.Store;
-import org.apache.oozie.service.DagXLogInfoService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XTestCase;
-import org.apache.oozie.util.XCallable;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.ErrorCode;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-
-public class TestCommand extends XTestCase {
-    private static List<String> EXECUTED = Collections.synchronizedList(new ArrayList<String>());
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        new Services().init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        Services.get().destroy();
-        super.tearDown();
-    }
-
-    private static class DummyXCallable implements XCallable<Void> {
-        private String name;
-        private String key = null;
-
-        public DummyXCallable(String name) {
-            this.name = name;
-            this.key = name + "_" + UUID.randomUUID();
-        }
-
-        @Override
-        public String getName() {
-            return name;
-        }
-
-        @Override
-        public String getType() {
-            return "type";
-        }
-
-        @Override
-        public int getPriority() {
-            return 0;
-        }
-
-        @Override
-        public long getCreatedTime() {
-            return 1;
-        }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("Type:").append(getType());
-            sb.append(",Priority:").append(getPriority());
-            return sb.toString();
-        }
-
-        @Override
-        public void setInterruptMode(boolean mode) {
-        }
-
-        @Override
-        public boolean inInterruptMode() {
-            return false;
-        }
-
-        public Void call() throws Exception {
-            EXECUTED.add(name);
-            return null;
-        }
-
-        @Override
-        public String getKey() {
-            return this.key;
-        }
-
-        @Override
-        public String getEntityKey() {
-            return null;
-        }
-
-    }
-
-    private static class MyCommand extends Command<Object, WorkflowStore> {
-        private boolean exception;
-
-        public MyCommand(boolean exception) {
-            super("test", "test", 1, XLog.OPS);
-            this.exception = exception;
-        }
-
-        @Override
-        protected Object call(WorkflowStore store) throws StoreException, CommandException {
-            assertTrue(logInfo.createPrefix().contains("JOB[job]"));
-            assertTrue(XLog.Info.get().createPrefix().contains("JOB[job]"));
-            assertTrue(logInfo.createPrefix().contains("ACTION[action]"));
-            assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]"));
-            assertNotNull(store);
-            assertEquals("test", getName());
-            assertEquals(1, getPriority());
-            queueCallable(new DummyXCallable("a"));
-            queueCallable(Arrays.asList(new DummyXCallable("b"), new DummyXCallable("c")));
-            queueCallable(new DummyXCallable("d"), 300);
-            queueCallable(new DummyXCallable("e"), 200);
-            queueCallable(new DummyXCallable("f"), 100);
-            queueCallableForException(new DummyXCallable("ex"));
-            if (exception) {
-                throw new CommandException(ErrorCode.E0800);
-            }
-            return null;
-        }
-
-        /**
-         * Return the public interface of the Workflow Store.
-         *
-         * @return {@link WorkflowStore}
-         */
-        @Override
-        public Class<? extends Store> getStoreClass() {
-            return WorkflowStore.class;
-        }
-    }
-
-    public void testDagCommand() throws Exception {
-        XLog.Info.get().clear();
-        XLog.Info.get().setParameter(DagXLogInfoService.JOB, "job");
-        XLog.Info.get().setParameter(DagXLogInfoService.ACTION, "action");
-
-        Command command = new MyCommand(false);
-
-        XLog.Info.get().clear();
-        command.call();
-
-        assertTrue(XLog.Info.get().createPrefix().contains("JOB[job]"));
-        assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]"));
-        command.resetLogInfoWorkflow();
-        assertTrue(XLog.Info.get().createPrefix().contains("JOB[-]"));
-        assertTrue(XLog.Info.get().createPrefix().contains("ACTION[action]"));
-        command.resetLogInfoAction();
-        assertTrue(XLog.Info.get().createPrefix().contains("ACTION[-]"));
-
-        waitFor(2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return EXECUTED.size() == 6;
-            }
-        });
-
-        assertEquals(6, EXECUTED.size());
-        assertEquals(Arrays.asList("a", "b", "c", "d", "e", "f"), EXECUTED);
-
-        EXECUTED.clear();
-
-        XLog.Info.get().setParameter(DagXLogInfoService.JOB, "job");
-        XLog.Info.get().setParameter(DagXLogInfoService.ACTION, "action");
-        command = new MyCommand(true);
-
-        try {
-            command.call();
-            fail();
-        }
-        catch (CommandException ex) {
-            //nop
-        }
-
-        waitFor(200, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return EXECUTED.size() == 2;
-            }
-        });
-
-        assertEquals(1, EXECUTED.size());
-        assertEquals(Arrays.asList("ex"), EXECUTED);
-    }
-
-    /**
-     * Return the public interface of the Workflow Store.
-     *
-     * @return {@link WorkflowStore}
-     */
-    public Class<? extends Store> getStoreClass() {
-        return WorkflowStore.class;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java
deleted file mode 100644
index 6278515..0000000
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionMaterializeCommand.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.oozie.command.coord;
-
-import java.util.Date;
-import java.util.List;
-
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
-import org.apache.oozie.SLAEventBean;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.store.CoordinatorStore;
-import org.apache.oozie.store.SLAStore;
-import org.apache.oozie.store.StoreException;
-import org.apache.oozie.test.XTestCase;
-import org.apache.oozie.util.DateUtils;
-
-@SuppressWarnings("deprecation")
-public class TestCoordActionMaterializeCommand extends XTestCase {
-    private Services services;
-
-    @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        services = new Services();
-        services.init();
-    }
-
-    @Override
-    protected void tearDown() throws Exception {
-        services.destroy();
-        super.tearDown();
-    }
-
-    public void testActionMater() throws Exception {
-        String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
-        String startTimeStr = "2009-03-06T10:00Z";
-        Date startTime = DateUtils.parseDateOozieTZ(startTimeStr);
-        Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
-        addRecordToJobTable(jobId, startTime, endTime);
-        new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
-        CoordinatorActionBean action = checkCoordAction(jobId + "@1");
-        assertEquals(action.getActionNumber(), 1);
-        assertEquals(action.getNominalTime(), startTime);
-        assertTrue(action.getMissingDependencies().indexOf("file:///tmp/coord/workflows/2009/03/01") > -1);
-        String actionXml = action.getActionXml();
-        String slaNotifMsg = actionXml.substring(actionXml.indexOf("<sla:notification-msg>") + 22,
-                actionXml.indexOf("</sla:notification-msg>"));
-        String expectedSlaMsg = "Notifying User for 2009-03-06T10:00Z,"
-                + DateUtils.formatDateOozieTZ(action.getCreatedTime()) + ",2009-03-06,2009-03-07T10:00Z,"
-                + action.getId() + ",NAME,testValue,testUser,"
-                + "file:///tmp/coord/workflows/2009/22myOutputDatabase,myOutputTable,'datestamp=20090306'";
-        assertEquals(expectedSlaMsg, slaNotifMsg);
-    }
-
-    public void testActionMaterWithPauseTime1() throws Exception {
-        String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
-
-        Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
-        Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
-        Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z");
-        addRecordToJobTable(jobId, startTime, endTime, pauseTime);
-        new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
-        checkCoordActions(jobId, 1, null);
-    }
-
-    public void testActionMaterWithPauseTime2() throws Exception {
-        String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
-
-        Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
-        Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
-        Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z");
-        addRecordToJobTable(jobId, startTime, endTime, pauseTime);
-        new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
-        checkCoordActions(jobId, 2, null);
-    }
-
-    public void testActionMaterWithPauseTime3() throws Exception {
-        String jobId = "0000000-" + new Date().getTime() + "-testActionMater-C";
-
-        Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
-        Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
-        Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T09:58Z");
-        addRecordToJobTable(jobId, startTime, endTime, pauseTime);
-        new CoordActionMaterializeCommand(jobId, startTime, endTime).call();
-        checkCoordActions(jobId, 0, CoordinatorJob.Status.RUNNING);
-    }
-
-    private void addRecordToJobTable(String jobId, Date startTime, Date endTime) throws StoreException {
-        CoordinatorStore store = new CoordinatorStore(false);
-        CoordinatorJobBean coordJob = new CoordinatorJobBean();
-        coordJob.setId(jobId);
-        coordJob.setAppName("testApp");
-        coordJob.setStartTime(startTime);
-        coordJob.setEndTime(endTime);
-        coordJob.setTimeUnit(Timeunit.DAY);
-        coordJob.setAppPath("testAppPath");
-        coordJob.setStatus(CoordinatorJob.Status.PREMATER);
-        coordJob.setCreatedTime(new Date()); // TODO: Do we need that?
-        coordJob.setLastModifiedTime(new Date());
-        coordJob.setUser("testUser");
-        coordJob.setGroup("testGroup");
-        coordJob.setTimeZone("America/Los_Angeles");
-        String confStr = "<configuration><property><name>testProperty</name><value>testValue</value></property>"
-                + "<property><name>user.name</name><value>testUser</value></property></configuration>";
-        coordJob.setConf(confStr);
-        String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' xmlns:sla='uri:oozie:sla:0.1' name='NAME' frequency=\"1\" start='2009-03-06T010:00Z' end='2009-03-11T10:00Z' timezone='America/Los_Angeles' freq_timeunit='DAY' end_of_duration='NONE'>";
-        appXml += "<controls>";
-        appXml += "<timeout>10</timeout>";
-        appXml += "<concurrency>2</concurrency>";
-        appXml += "<execution>LIFO</execution>";
-        appXml += "</controls>";
-        appXml += "<input-events>";
-        appXml += "<data-in name='A' dataset='a'>";
-        appXml += "<dataset name='a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
-        appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${MONTH}/${DAY}</uri-template>";
-        appXml += "</dataset>";
-        appXml += "<instance>${coord:current(0)}</instance>";
-        appXml += "<instance>${coord:latest(-1)}</instance>";
-        appXml += "</data-in>";
-        appXml += "</input-events>";
-        appXml += "<output-events>";
-        appXml += "<data-out name='LOCAL_A' dataset='local_a'>";
-        appXml += "<dataset name='local_a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
-        appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>";
-        appXml += "</dataset>";
-        appXml += "<instance>${coord:current(-1)}</instance>";
-        appXml += "</data-out>";
-        appXml += "<data-out name='aggregated-logs' dataset='Stats'>";
-        appXml += "<dataset name='Stats' frequency='1' initial-instance='2009-01-01T01:00Z' ";
-        appXml += "timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
-        appXml += "<uri-template>hcat://foo:11002/myOutputDatabase/myOutputTable/datestamp=${YEAR}${MONTH}${DAY}";
-        appXml += "</uri-template></dataset>";
-        appXml += "<instance>${coord:current(0)}</instance>";
-        appXml += "</data-out>";
-        appXml += "</output-events>";
-        appXml += "<action>";
-        appXml += "<workflow>";
-        appXml += "<app-path>hdfs:///tmp/workflows/</app-path>";
-        appXml += "<configuration>";
-        appXml += "<property>";
-        appXml += "<name>inputA</name>";
-        appXml += "<value>${coord:dataIn('A')}</value>";
-        appXml += "</property>";
-        appXml += "<property>";
-        appXml += "<name>inputB</name>";
-        appXml += "<value>${coord:dataOut('LOCAL_A')}</value>";
-        appXml += "</property>";
-        appXml += "</configuration>";
-        appXml += "</workflow>";
-        appXml += " <sla:info>"
-                + " <sla:app-name>test-app</sla:app-name>"
-                + " <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>"
-                + " <sla:should-start>5</sla:should-start>"
-                + " <sla:should-end>120</sla:should-end>"
-                + " <sla:notification-msg>Notifying User for ${coord:nominalTime()},${coord:actualTime()},"
-                + "${coord:formatTime(coord:nominalTime(),'yyyy-MM-dd')},${coord:dateOffset(coord:nominalTime(), 1, 'DAY')},"
-                + "${coord:actionId()},${coord:name()},${coord:conf('testProperty')},${coord:user()},${coord:dataOut('LOCAL_A')}"
-                + "${coord:databaseOut('aggregated-logs')},${coord:tableOut('aggregated-logs')},"
-                + "${coord:dataOutPartitions('aggregated-logs')}"
-                + "</sla:notification-msg>" + " <sla:alert-contact>abc@example.com</sla:alert-contact>"
-                + " <sla:dev-contact>abc@example.com</sla:dev-contact>"
-                + " <sla:qa-contact>abc@example.com</sla:qa-contact>"
-                + " <sla:se-contact>abc@example.com</sla:se-contact>" + "</sla:info>";
-        appXml += "</action>";
-        appXml += "</coordinator-app>";
-        /*try {
-            System.out.println(XmlUtils.prettyPrint(XmlUtils.parseXml(appXml)));
-            ;
-        }
-        catch (JDOMException e1) {
-            // TODO Auto-generated catch block
-            e1.printStackTrace();
-        }*/
-        coordJob.setJobXml(appXml);
-        coordJob.setLastActionNumber(0);
-        coordJob.setFrequency("1");
-        try {
-            coordJob.setEndTime(DateUtils.parseDateOozieTZ("2009-03-11T10:00Z"));
-        }
-        catch (Exception e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-            fail("Could not set end time");
-        }
-        try {
-            store.beginTrx();
-            store.insertCoordinatorJob(coordJob);
-            store.commitTrx();
-        }
-        catch (StoreException se) {
-            se.printStackTrace();
-            store.rollbackTrx();
-            fail("Unable to insert the test job record to table");
-            throw se;
-        }
-        finally {
-            store.closeTrx();
-        }
-    }
-
-    private CoordinatorActionBean checkCoordAction(String actionId) throws StoreException {
-        CoordinatorStore store = new CoordinatorStore(false);
-        try {
-            CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
-            SLAStore slaStore = new SLAStore(store);
-            long lastSeqId[] = new long[1];
-            List<SLAEventBean> slaEvents = slaStore.getSLAEventListNewerSeqLimited(0, 10, lastSeqId);
-            // System.out.println("AAA " + slaEvents.size() + " : " +
-            // lastSeqId[0]);
-            if (slaEvents.size() == 0) {
-                fail("Unable to GET any record of sequence id greater than 0");
-            }
-            return action;
-        }
-        catch (StoreException se) {
-            se.printStackTrace();
-            fail("Action ID " + actionId + " was not stored properly in db");
-        }
-        return null;
-    }
-
-    private void addRecordToJobTable(String jobId, Date startTime, Date endTime, Date pauseTime) throws StoreException {
-        CoordinatorStore store = new CoordinatorStore(false);
-        CoordinatorJobBean coordJob = new CoordinatorJobBean();
-        coordJob.setId(jobId);
-        coordJob.setAppName("testApp");
-        coordJob.setStartTime(startTime);
-        coordJob.setEndTime(endTime);
-        coordJob.setPauseTime(pauseTime);
-        coordJob.setTimeUnit(Timeunit.MINUTE);
-        coordJob.setAppPath("testAppPath");
-        coordJob.setStatus(CoordinatorJob.Status.PREMATER);
-        coordJob.setCreatedTime(new Date()); // TODO: Do we need that?
-        coordJob.setLastModifiedTime(new Date());
-        coordJob.setUser("testUser");
-        coordJob.setGroup("testGroup");
-        coordJob.setTimeZone("America/Los_Angeles");
-        String confStr = "<configuration></configuration>";
-        coordJob.setConf(confStr);
-        String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' xmlns:sla='uri:oozie:sla:0.1' name='NAME' frequency=\"5\" start='2009-03-06T010:00Z' end='2009-03-06T10:14Z' timezone='America/Los_Angeles' freq_timeunit='MINUTE' end_of_duration='NONE'>";
-        appXml += "<controls>";
-        appXml += "<timeout>10</timeout>";
-        appXml += "<concurrency>2</concurrency>";
-        appXml += "<execution>LIFO</execution>";
-        appXml += "</controls>";
-        appXml += "<action>";
-        appXml += "<workflow>";
-        appXml += "<app-path>hdfs:///tmp/workflows/</app-path>";
-        appXml += "<configuration>";
-        appXml += "</configuration>";
-        appXml += "</workflow>";
-        appXml += " <sla:info>"
-                // + " <sla:client-id>axonite-blue</sla:client-id>"
-                + " <sla:app-name>test-app</sla:app-name>"
-                + " <sla:nominal-time>${coord:nominalTime()}</sla:nominal-time>"
-                + " <sla:should-start>5</sla:should-start>"
-                + " <sla:should-end>120</sla:should-end>"
-                + " <sla:notification-msg>Notifying User for ${coord:nominalTime()} nominal time </sla:notification-msg>"
-                + " <sla:alert-contact>abc@example.com</sla:alert-contact>"
-                + " <sla:dev-contact>abc@example.com</sla:dev-contact>"
-                + " <sla:qa-contact>abc@example.com</sla:qa-contact>" + " <sla:se-contact>abc@example.com</sla:se-contact>"
-                + "</sla:info>";
-        appXml += "</action>";
-        appXml += "</coordinator-app>";
-
-        coordJob.setJobXml(appXml);
-        coordJob.setLastActionNumber(0);
-        coordJob.setFrequency("5");
-        try {
-            store.beginTrx();
-            store.insertCoordinatorJob(coordJob);
-            store.commitTrx();
-        }
-        catch (StoreException se) {
-            se.printStackTrace();
-            store.rollbackTrx();
-            fail("Unable to insert the test job record to table");
-            throw se;
-        }
-        finally {
-            store.closeTrx();
-        }
-    }
-
-    private void checkCoordActions(String jobId, int number, CoordinatorJob.Status status) throws StoreException {
-        CoordinatorStore store = new CoordinatorStore(false);
-        try {
-            int coordActionsCount = store.getActionsForCoordinatorJob(jobId, false);
-            if (coordActionsCount != number) {
-                fail("Should have " + number + " actions created for job " + jobId);
-            }
-
-            if (status != null) {
-                CoordinatorJob job = store.getCoordinatorJob(jobId, false);
-                if (job.getStatus() != status) {
-                    fail("Job status " + job.getStatus() + " should be " + status);
-                }
-            }
-        }
-        catch (StoreException se) {
-            se.printStackTrace();
-            fail("Job ID " + jobId + " was not stored properly in db");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
index 7f7a387..aab8efd 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordELExtensions.java
@@ -55,10 +55,10 @@ public class TestCoordELExtensions extends XDataTestCase {
         Date startTime = DateUtils.parseDateUTC("2009-03-06T010:00Z");
         Date endTime = DateUtils.parseDateUTC("2009-03-07T12:00Z");
         CoordinatorJobBean job = createCoordJob("coord-job-for-elext.xml",
-                CoordinatorJob.Status.PREMATER, startTime, endTime, false, false, 0);
+                CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
         addRecordToCoordJobTable(job);
 
-        new CoordActionMaterializeCommand(job.getId(), startTime, endTime).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
         checkCoordAction(job.getId() + "@1");
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 5b22abc..60eff2f 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -31,7 +31,6 @@ import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
@@ -382,6 +381,21 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         checkCoordActions(job.getId(), 0, CoordinatorJob.Status.PAUSED);
     }
 
+    public void testGetDryrun() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
+        CoordinatorJobBean job = createCoordJob(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
+        job.setFrequency("5");
+        job.setTimeUnit(Timeunit.MINUTE);
+        job.setMatThrottling(20);
+        String dryRunOutput = new CoordMaterializeTransitionXCommand(job, 3600, startTime, endTime).materializeActions(true);
+        String[] actions = dryRunOutput.split("action for new instance");
+        assertEquals(3, actions.length -1);
+        for(int i = 1; i < actions.length; i++) {
+            assertTrue(actions[i].contains("action-nominal-time"));
+        }
+    }
+
     public void testTimeout() throws Exception {
         Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 190bad7..be5e84a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -836,7 +836,7 @@
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
                     <artifactId>maven-surefire-plugin</artifactId>
-                    <version>2.12</version>
+                    <version>2.12.2</version>
                 </plugin>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>

http://git-wip-us.apache.org/repos/asf/oozie/blob/31365255/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 9966933..6ec2bcb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,4 +1,5 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
+OOZIE-1846 Convert CoordActionMaterializeCommand to an XCommand and remove Command (seoeun25 via shwethags)
 OOZIE-1925 upgrade tomcat to 6.0.41 (rkanter via shwethags)
 OOZIE-1943 Bump up trunk to 4.2.0-SNAPSHOT (bzhang)