You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by km...@apache.org on 2018/11/22 10:32:39 UTC

oozie git commit: OOZIE-3381 [coordinator] Enhance logging of CoordElFunctions (andras.piros via kmarton)

Repository: oozie
Updated Branches:
  refs/heads/master 34289c4c7 -> 27e4bf168


OOZIE-3381 [coordinator] Enhance logging of CoordElFunctions (andras.piros via kmarton)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/27e4bf16
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/27e4bf16
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/27e4bf16

Branch: refs/heads/master
Commit: 27e4bf1688a6a7750b9c8454de5021337696fd61
Parents: 34289c4
Author: Julia Kinga Marton <km...@apache.org>
Authored: Thu Nov 22 11:31:12 2018 +0100
Committer: Julia Kinga Marton <km...@apache.org>
Committed: Thu Nov 22 11:31:12 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/oozie/command/XCommand.java |   2 +-
 .../apache/oozie/coord/CoordELFunctions.java    | 683 ++++++++++++-------
 .../oozie/command/coord/CoordELExtensions.java  |   7 +-
 .../oozie/coord/TestOozieTimeUnitConverter.java |  76 +++
 release-log.txt                                 |   1 +
 5 files changed, 503 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/main/java/org/apache/oozie/command/XCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java
index a80444e..7b2dbd5 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -218,7 +218,7 @@ public abstract class XCommand<T> implements XCallable<T> {
                         getLockTimeOut(), getName());
             }
             else {
-                throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
+                throw new CommandException(ErrorCode.E0606, getEntityKey(), getLockTimeOut());
             }
         }
         else {

http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
index 10f4f0d..c3fecd8 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
@@ -18,6 +18,8 @@
 
 package org.apache.oozie.coord;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.apache.commons.lang.StringUtils;
@@ -28,6 +30,7 @@ import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.dependency.URIHandler.Context;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.DateUtils;
@@ -37,18 +40,23 @@ import org.apache.oozie.util.XLog;
 import org.jdom.JDOMException;
 
 import java.net.URI;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.List;
 import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * This class implements the EL function related to coordinator
  */
 
 public class CoordELFunctions {
+    private static final XLog LOG = XLog.getLog(CoordELFunctions.class);
+
     final public static String DATASET = "oozie.coord.el.dataset.bean";
     final public static String COORD_ACTION = "oozie.coord.el.app.bean";
     final public static String CONFIGURATION = "oozie.coord.el.conf";
@@ -320,105 +328,8 @@ public class CoordELFunctions {
         return coord_futureRange_sync(n, n, instance);
     }
 
-    private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
-        final XLog LOG = XLog.getLog(CoordELFunctions.class);
-        final Thread currentThread = Thread.currentThread();
-        ELEvaluator eval = ELEvaluator.getCurrent();
-        String retVal = "";
-        int datasetFrequency = (int) getDSFrequency();// in minutes
-        TimeUnit dsTimeUnit = getDSTimeUnit();
-        int[] instCount = new int[1];
-        Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
-        StringBuilder resolvedInstances = new StringBuilder();
-        StringBuilder resolvedURIPaths = new StringBuilder();
-        if (nominalInstanceCal != null) {
-            Calendar initInstance = getInitialInstanceCal();
-            nominalInstanceCal = (Calendar) initInstance.clone();
-            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
-
-            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
-            if (ds == null) {
-                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
-            }
-            String uriTemplate = ds.getUriTemplate();
-            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
-            if (conf == null) {
-                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
-            }
-            int available = 0, checkedInstance = 0;
-            boolean resolved = false;
-            String user = ParamChecker
-                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
-            String doneFlag = ds.getDoneFlag();
-            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
-            URIHandler uriHandler = null;
-            Context uriContext = null;
-            try {
-                while (instance >= checkedInstance && !currentThread.isInterrupted()) {
-                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
-                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
-                    if (uriHandler == null) {
-                        URI uri = new URI(uriPath);
-                        uriHandler = uriService.getURIHandler(uri);
-                        uriContext = uriHandler.getContext(uri, conf, user, true);
-                    }
-                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
-                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
-                        if (available == endOffset) {
-                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
-                            resolved = true;
-                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
-                            resolvedURIPaths.append(uriPath);
-                            retVal = resolvedInstances.toString();
-                            eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
-                            break;
-                        }
-                        else if (available >= startOffset) {
-                            LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
-                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
-                                    INSTANCE_SEPARATOR);
-                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
-
-                        }
-                        available++;
-                    }
-                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
-                    nominalInstanceCal = (Calendar) initInstance.clone();
-                    instCount[0]++;
-                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
-                    checkedInstance++;
-                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
-                }
-                if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
-                    eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
-                }
-
-            }
-            finally {
-                if (uriContext != null) {
-                    uriContext.destroy();
-                }
-            }
-            if (!resolved) {
-                // return unchanged future function with variable 'is_resolved'
-                // to 'false'
-                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
-                if (startOffset == endOffset) {
-                    retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
-                }
-                else {
-                    retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
-                }
-            }
-            else {
-                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
-            }
-        }
-        else {// No feasible nominal time
-            eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
-            retVal = "";
-        }
-        return retVal;
+    private static String coord_futureRange_sync(final int startOffset, final int endOffset, final int instance) throws Exception {
+        return new FutureEvaluator(startOffset, endOffset, instance).evaluate();
     }
 
     /**
@@ -843,7 +754,7 @@ public class CoordELFunctions {
     }
 
     public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception {
-        int[] instanceCount = new int[1];
+        final AtomicInteger instanceCount = new AtomicInteger(0);
         Calendar startInstanceCal = DateUtils.getCalendar(startInstance);
         Calendar currentInstance = getCurrentInstance(startInstanceCal.getTime(), instanceCount);
         // getCurrentInstance() returns null, which means startInstance is less
@@ -859,7 +770,7 @@ public class CoordELFunctions {
                             + DateUtils.formatDateOozieTZ(getInitialInstanceCal()) + " and start-instance is "
                             + DateUtils.formatDateOozieTZ(startInstanceCal));
         }
-        int[] nominalCount = new int[1];
+        final AtomicInteger nominalCount = new AtomicInteger(0);
         if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) {
             throw new CommandException(ErrorCode.E1010,
                     "initial-instance should be equal or earlier than the nominal time. initial-instance is "
@@ -868,11 +779,11 @@ public class CoordELFunctions {
         // getCurrentInstance return offset relative to initial instance.
         // start instance offset - nominal offset = start offset relative to
         // nominal time-stamp.
-        int start = instanceCount[0] - nominalCount[0];
+        int start = instanceCount.get() - nominalCount.get();
         if (start > end) {
             throw new CommandException(ErrorCode.E1010,
                     "start-instance should be equal or earlier than the end-instance. startInstance is "
-                            + startInstance + " which is equivalent to current (" + instanceCount[0]
+                            + startInstance + " which is equivalent to current (" + instanceCount.get()
                             + ") but end is specified as current (" + end + ")");
         }
         return ph2_coord_currentRange(start, end);
@@ -1051,7 +962,7 @@ public class CoordELFunctions {
         final XLog LOG = XLog.getLog(CoordELFunctions.class);
         int datasetFrequency = getDSFrequency();// in minutes
         TimeUnit dsTimeUnit = getDSTimeUnit();
-        int[] instCount = new int[1];// used as pass by ref
+        final AtomicInteger instCount = new AtomicInteger(0);// used as pass by ref
         Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
         if (nominalInstanceCal == null) {
             LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
@@ -1062,7 +973,7 @@ public class CoordELFunctions {
             Calendar initInstance = getInitialInstanceCal();
             // Add in the reverse order - newest instance first.
             nominalInstanceCal = (Calendar) initInstance.clone();
-            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount[0] + start) * datasetFrequency);
+            nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount.get() + start) * datasetFrequency);
             List<String> instances = new ArrayList<String>();
             for (int i = start; i <= end; i++) {
                 if (nominalInstanceCal.compareTo(initInstance) < 0) {
@@ -1153,108 +1064,7 @@ public class CoordELFunctions {
     }
 
     private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
-        final XLog LOG = XLog.getLog(CoordELFunctions.class);
-        final Thread currentThread = Thread.currentThread();
-        ELEvaluator eval = ELEvaluator.getCurrent();
-        String retVal = "";
-        int datasetFrequency = (int) getDSFrequency();// in minutes
-        TimeUnit dsTimeUnit = getDSTimeUnit();
-        int[] instCount = new int[1];
-        boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
-        Calendar nominalInstanceCal;
-        if (useCurrentTime) {
-            nominalInstanceCal = getCurrentInstance(new Date(), instCount);
-        }
-        else {
-            nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
-        }
-        StringBuilder resolvedInstances = new StringBuilder();
-        StringBuilder resolvedURIPaths = new StringBuilder();
-        if (nominalInstanceCal != null) {
-            Calendar initInstance = getInitialInstanceCal();
-            SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
-            if (ds == null) {
-                throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
-            }
-            String uriTemplate = ds.getUriTemplate();
-            Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
-            if (conf == null) {
-                throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
-            }
-            int available = 0;
-            boolean resolved = false;
-            String user = ParamChecker
-                    .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
-            String doneFlag = ds.getDoneFlag();
-            URIHandlerService uriService = Services.get().get(URIHandlerService.class);
-            URIHandler uriHandler = null;
-            Context uriContext = null;
-            try {
-                while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) {
-                    ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
-                    String uriPath = uriEval.evaluate(uriTemplate, String.class);
-                    if (uriHandler == null) {
-                        URI uri = new URI(uriPath);
-                        uriHandler = uriService.getURIHandler(uri);
-                        uriContext = uriHandler.getContext(uri, conf, user, true);
-                    }
-                    String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
-                    if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
-                        XLog.getLog(CoordELFunctions.class)
-                        .debug("Found latest(" + available + "): " + uriWithDoneFlag);
-                        if (available == startOffset) {
-                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
-                            resolved = true;
-                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
-                            resolvedURIPaths.append(uriPath);
-                            retVal = resolvedInstances.toString();
-                            eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
-
-                            break;
-                        }
-                        else if (available <= endOffset) {
-                            LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
-                            resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
-                                    INSTANCE_SEPARATOR);
-                            resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
-                        }
-
-                        available--;
-                    }
-                    // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
-                    nominalInstanceCal = (Calendar) initInstance.clone();
-                    instCount[0]--;
-                    nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
-                    // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
-                }
-                if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
-                    eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
-                }
-            }
-            finally {
-                if (uriContext != null) {
-                    uriContext.destroy();
-                }
-            }
-            if (!resolved) {
-                // return unchanged latest function with variable 'is_resolved'
-                // to 'false'
-                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
-                if (startOffset == endOffset) {
-                    retVal = "${coord:latest(" + startOffset + ")}";
-                }
-                else {
-                    retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
-                }
-            }
-            else {
-                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
-            }
-        }
-        else {// No feasible nominal time
-            eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
-        }
-        return retVal;
+        return new LatestEvaluator(startOffset, endOffset).evaluate();
     }
 
     /**
@@ -1406,7 +1216,7 @@ public class CoordELFunctions {
      * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
      *         the dataset.
      */
-    public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
+    public static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount) {
         ELEvaluator eval = ELEvaluator.getCurrent();
         return getCurrentInstance(effectiveTime, instanceCount, eval);
     }
@@ -1417,7 +1227,7 @@ public class CoordELFunctions {
      * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
      *         the dataset.
      */
-    private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
+    private static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount, ELEvaluator eval) {
         Date datasetInitialInstance = getInitialInstance(eval);
         TimeUnit dsTimeUnit = getDSTimeUnit(eval);
         TimeZone dsTZ = getDatasetTZ(eval);
@@ -1429,91 +1239,52 @@ public class CoordELFunctions {
         Calendar calEffectiveTime = new GregorianCalendar(dsTZ);
         calEffectiveTime.setTime(effectiveTime);
         if (instanceCount == null) {    // caller doesn't care about this value
-            instanceCount = new int[1];
+            instanceCount = new AtomicInteger(0);
         }
-        instanceCount[0] = 0;
+        instanceCount.set(0);
         if (current.compareTo(calEffectiveTime) > 0) {
             return null;
         }
 
         switch(dsTimeUnit) {
             case MINUTE:
-                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC);
+                instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC));
                 break;
             case HOUR:
-                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC);
+                instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC));
                 break;
             case DAY:
             case END_OF_DAY:
-                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC);
+                instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC));
                 break;
             case WEEK:
             case END_OF_WEEK:
-                instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / WEEK_MSEC);
+                instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / WEEK_MSEC));
                 break;
             case MONTH:
             case END_OF_MONTH:
                 int diffYear = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR);
-                instanceCount[0] = diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH);
+                instanceCount.set(diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH));
                 break;
             case YEAR:
-                instanceCount[0] = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR);
+                instanceCount.set(calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR));
                 break;
             default:
                 throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit);
         }
 
-        if (instanceCount[0] > 2) {
-            instanceCount[0] = (instanceCount[0] / dsFreq);
-            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
+        if (instanceCount.get() > 2) {
+            instanceCount.set(instanceCount.get() / dsFreq);
+            current.add(dsTimeUnit.getCalendarUnit(), instanceCount.get() * dsFreq);
         } else {
-            instanceCount[0] = 0;
+            instanceCount.set(0);
         }
         while (!current.getTime().after(effectiveTime)) {
             current.add(dsTimeUnit.getCalendarUnit(), dsFreq);
-            instanceCount[0]++;
+            instanceCount.incrementAndGet();
         }
         current.add(dsTimeUnit.getCalendarUnit(), -dsFreq);
-        instanceCount[0]--;
-        return current;
-    }
-
-    /**
-     * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
-     *
-     * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
-     *         the dataset.
-     */
-    private static Calendar getCurrentInstance_old(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
-        Date datasetInitialInstance = getInitialInstance(eval);
-        TimeUnit dsTimeUnit = getDSTimeUnit(eval);
-        TimeZone dsTZ = getDatasetTZ(eval);
-        int dsFreq = getDSFrequency(eval);
-        // Convert Date to Calendar for corresponding TZ
-        Calendar current = Calendar.getInstance();
-        current.setTime(datasetInitialInstance);
-        current.setTimeZone(dsTZ);
-
-        Calendar calEffectiveTime = Calendar.getInstance();
-        calEffectiveTime.setTime(effectiveTime);
-        calEffectiveTime.setTimeZone(dsTZ);
-        if (instanceCount == null) {    // caller doesn't care about this value
-            instanceCount = new int[1];
-        }
-        instanceCount[0] = 0;
-        if (current.compareTo(calEffectiveTime) > 0) {
-            return null;
-        }
-        Calendar origCurrent = (Calendar) current.clone();
-        while (current.compareTo(calEffectiveTime) <= 0) {
-            current = (Calendar) origCurrent.clone();
-            instanceCount[0]++;
-            current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
-        }
-        instanceCount[0]--;
-
-        current = (Calendar) origCurrent.clone();
-        current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
+        instanceCount.decrementAndGet();
         return current;
     }
 
@@ -1706,4 +1477,392 @@ public class CoordELFunctions {
         cal.add(timeUnit.getCalendarUnit(), n);
         return cal;
     }
+
+    /**
+     * Evaluating {@code coord:future()} and {@code coord:futureRange()} data input dependencies.
+     */
+    static final class FutureEvaluator extends RangeEvaluator {
+        private final int instance;
+        private int checkedInstance = 0;
+
+        FutureEvaluator(final int startOffset, final int endOffset, final int instance) {
+            super("future", startOffset, endOffset);
+            this.instance = instance;
+        }
+
+        @Override
+        protected Calendar getNominalInstance() {
+            return getCurrentInstance(getActionCreationtime(), instCount);
+        }
+
+        @Override
+        protected void reset() {
+            super.reset();
+            checkedInstance = 0;
+        }
+
+        @Override
+        protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) {
+            return instance >= checkedInstance;
+        }
+
+        @Override
+        protected boolean isFirst() {
+            return available == endOffset;
+        }
+
+        @Override
+        protected boolean isInBetween() {
+            return available >= startOffset;
+        }
+
+        @Override
+        protected void stepAvailable() {
+            available++;
+        }
+
+        @Override
+        protected void stepInstanceCount() {
+            instCount.incrementAndGet();
+            checkedInstance++;
+        }
+
+        @Override
+        protected String from() {
+            return String.format("%s, %s", startOffset, instance);
+        }
+
+        @Override
+        protected String fromTo() {
+            return String.format("%s, %s, %s", startOffset, endOffset, instance);
+        }
+    }
+
+    /**
+     * Evaluating {@code coord:latest()} and {@code coord:latestRange()} data input dependencies.
+     */
+    static final class LatestEvaluator extends RangeEvaluator {
+
+        LatestEvaluator(final int startOffset, final int endOffset) {
+            super("latest", startOffset, endOffset);
+        }
+
+        @Override
+        protected Calendar getNominalInstance() {
+            final boolean useCurrentTime = ConfigurationService.getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
+            if (useCurrentTime) {
+                return getCurrentInstance(new Date(), instCount);
+            }
+            else {
+                return getCurrentInstance(getActualTime(), instCount);
+            }
+        }
+
+        @Override
+        protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) {
+            return nominalInstance.compareTo(initInstance) >= 0;
+        }
+
+        @Override
+        protected boolean isFirst() {
+            return available == startOffset;
+        }
+
+        @Override
+        protected boolean isInBetween() {
+            return available <= endOffset;
+        }
+
+        @Override
+        protected void stepAvailable() {
+            available--;
+        }
+
+        @Override
+        protected void stepInstanceCount() {
+            instCount.decrementAndGet();
+        }
+
+        @Override
+        protected String from() {
+            return String.format("%s", startOffset);
+        }
+
+        @Override
+        protected String fromTo() {
+            return String.format("%s, %s", startOffset, endOffset);
+        }
+    }
+
+    private static abstract class RangeEvaluator {
+        /**
+         * What is the use case: evaluating {@code future} or {@code latest} dataset occurrences.
+         */
+        private final String type;
+
+        /**
+         *
+         */
+        final int startOffset;
+        final int endOffset;
+
+        final AtomicInteger instCount = new AtomicInteger(0);
+        int available = 0;
+
+        RangeEvaluator(final String type, final int startOffset, final int endOffset) {
+            this.type = type;
+            this.startOffset = startOffset;
+            this.endOffset = endOffset;
+        }
+
+        /**
+         * Evaluate the input data dependency's EL function based on HDFS checks for URI existence, or leave it unevaluated.
+         * <p>
+         * Based on following:
+         * <ul>
+         *     <li>{@code startOffset}</li>
+         *     <li>{@code endOffset}</li>
+         *     <li>internal state like {@code instCount} and {@code available}</li>
+         * </ul>
+         * @return timestamp of the {@code future} / {@code latest} HDFS URI based on parameters, and HDFS URI presence, or the
+         * unmodified input EL expression, if not present.
+         * @throws Exception in case of wrong arguments, or HDFS access errors
+         */
+        String evaluate() throws Exception {
+            final Thread currentThread = Thread.currentThread();
+            final ELEvaluator eval = ELEvaluator.getCurrent();
+            String retVal = "";
+            final int datasetFrequency = getDSFrequency();// in minutes
+            final TimeUnit dsTimeUnit = getDSTimeUnit();
+            instCount.set(0);
+
+            Calendar nominalInstance = getNominalInstance();
+
+            final StringBuilder resolvedInstances = new StringBuilder();
+            final StringBuilder resolvedURIPaths = new StringBuilder();
+            if (nominalInstance != null) {
+                final Calendar initInstance = getInitialInstanceCal();
+                SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
+                if (ds == null) {
+                    throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
+                }
+                final String uriTemplate = ds.getUriTemplate();
+                final Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
+                if (conf == null) {
+                    throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
+                }
+
+                reset();
+
+                boolean resolved = false;
+                final String user = ParamChecker
+                        .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
+                final String doneFlag = ds.getDoneFlag();
+                final URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+                URIHandler uriHandler = null;
+                Context uriContext = null;
+                try {
+                    int retries = 0;
+                    final DateFormat dMYHMS = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss");
+                    final long maxRetries = new OozieTimeUnitConverter().convertMillis(
+                            (nominalInstance.getTime().getTime() - initInstance.getTime().getTime()) / ds.getFrequency(),
+                            dsTimeUnit);
+                    if (maxRetries > 0) {
+                        LOG.debug("Approximately [{0}] maximal retries going back till [{1}] for checking latest " +
+                                        "dataset existence. [name={2}]",
+                                maxRetries,
+                                dMYHMS.format(initInstance.getTime()),
+                                ds.getName());
+                    }
+                    while (isAvailable(nominalInstance, initInstance) && !currentThread.isInterrupted()) {
+                        final ELEvaluator uriEval = getUriEvaluator(nominalInstance);
+                        final String uriPath = uriEval.evaluate(uriTemplate, String.class);
+                        if (uriHandler == null) {
+                            URI uri = new URI(uriPath);
+                            uriHandler = uriService.getURIHandler(uri);
+                            uriContext = uriHandler.getContext(uri, conf, user, true);
+                        }
+                        final String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+                        LOG.trace("Checking dataset existence. [name={0};uriWithDoneFlag={1};retries={2};nominalTime={3}]",
+                                ds.getName(),
+                                uriWithDoneFlag,
+                                retries,
+                                dMYHMS.format(nominalInstance.getTime()));
+                        final Date now = new Date();
+                        final boolean uriWithDoneFlagExists = uriHandler.exists(new URI(uriWithDoneFlag), uriContext);
+                        final Date later = new Date();
+                        LOG.trace("[{0}] ms elapsed while checking for dataset existence. [name={1};uriWithDoneFlag={2}]",
+                                later.getTime() - now.getTime(),
+                                ds.getName(),
+                                uriWithDoneFlag);
+                        if (uriWithDoneFlagExists) {
+                            LOG.debug("Found current dataset for {0}({1}). [name={2};uriWithDoneFlag={3}",
+                                    type,
+                                    available,
+                                    ds.getName(),
+                                    uriWithDoneFlag);
+                            if (isFirst()) {
+                                LOG.debug("Matched first dataset for {0}({1}), resolving. [name={2};uriWithDoneFlag={3}]",
+                                        type,
+                                        available,
+                                        ds.getName(),
+                                        uriWithDoneFlag);
+                                resolved = true;
+                                resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance));
+                                resolvedURIPaths.append(uriPath);
+                                retVal = resolvedInstances.toString();
+                                eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
+
+                                break;
+                            }
+                            else if (isInBetween()) {
+                                LOG.debug("Matched dataset in between for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]",
+                                        type,
+                                        available,
+                                        ds.getName(),
+                                        uriWithDoneFlag);
+                                resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance)).append(
+                                        INSTANCE_SEPARATOR);
+                                resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+                            }
+                            else {
+                                LOG.debug("Not matching dataset for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]",
+                                        type,
+                                        available,
+                                        ds.getName(),
+                                        uriWithDoneFlag);
+                            }
+
+                            stepAvailable();
+                        }
+                        else {
+                            LOG.trace("Could not find dataset. [name={0};uriWithDoneFlag={1}]",
+                                    ds.getName(),
+                                    uriWithDoneFlag);
+                        }
+
+                        stepInstanceCount();
+
+                        nominalInstance = (Calendar) initInstance.clone();
+                        nominalInstance.add(dsTimeUnit.getCalendarUnit(), instCount.get() * datasetFrequency);
+
+                        retries++;
+                    }
+                    if (!StringUtils.isEmpty(resolvedURIPaths.toString())
+                            && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
+                        eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
+                    }
+                }
+                finally {
+                    if (uriContext != null) {
+                        uriContext.destroy();
+                    }
+                }
+                if (!resolved) {
+                    // return unchanged function with variable 'is_resolved'
+                    // to 'false'
+                    eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
+                    if (startOffset == endOffset) {
+                        retVal = String.format("${coord:%s(%s)}", type, from());
+                    }
+                    else {
+                        retVal = String.format("${coord:%sRange(%s)}", type, fromTo());
+                    }
+                }
+                else {
+                    eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
+                }
+            }
+            else {// No feasible nominal time
+                eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
+            }
+
+            return retVal;
+
+        }
+
+        /**
+         * Get the nominal {@link Calendar} instance, that is, as base of the next evaluated occurrence.
+         * @return a {@link Calendar} instance based on {@link #type}
+         */
+        protected abstract Calendar getNominalInstance();
+
+        /**
+         * Reset the evaluator's internal state between two {@link #evaluate()} calls.
+         */
+        protected void reset() {
+            available = 0;
+        }
+
+        /**
+         * Checks whether a dataset is available.
+         * @param nominalInstance the nominal instance
+         * @param initInstance
+         * @return
+         */
+        protected abstract boolean isAvailable(Calendar nominalInstance, Calendar initInstance);
+
+        /**
+         * Checks whether it's the first matching for the given evaluation.
+         * @return
+         */
+        protected abstract boolean isFirst();
+
+        /**
+         * Checks whether it's not the first but a valid matching for the given evaluation.
+         * @return
+         */
+        protected abstract boolean isInBetween();
+
+        /**
+         * Modify the internal state in case of a match.
+         */
+        protected abstract void stepAvailable();
+
+        /**
+         * Modify the internal state in case an instance was checked.
+         */
+        protected abstract void stepInstanceCount();
+
+        /**
+         * Substitution of the range parameters for an open range with only a starting point.
+         * @return
+         */
+        protected abstract String from();
+
+
+        /**
+         * Substitution of the range parameters for a closed range with a starting and an ending point.
+         * @return
+         */
+        protected abstract String fromTo();
+    }
+
+    @VisibleForTesting
+    static class OozieTimeUnitConverter {
+        /**
+         * Convert {@code millis} given {@code source} to {@link java.util.concurrent.TimeUnit}.
+         * @param millis
+         * @param source
+         * @return -1 if no correct {@code source} was given, else the estimated occurrence count of a dataset
+         */
+        long convertMillis(final long millis, final TimeUnit source) {
+            Preconditions.checkNotNull(source, "source has to be filled");
+
+            switch (source) {
+                case YEAR:
+                    return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 365;
+                case MONTH:
+                    return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 31;
+                case DAY:
+                    return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
+                case HOUR:
+                    return java.util.concurrent.TimeUnit.HOURS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
+                case MINUTE:
+                    return java.util.concurrent.TimeUnit.MINUTES.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
+                default:
+                    return -1;
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java b/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
index 796d19c..2880641 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
@@ -19,6 +19,7 @@
  package org.apache.oozie.command.coord;
 
 import java.util.Calendar;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.util.ELEvaluator;
@@ -45,19 +46,19 @@ public class CoordELExtensions {
         dsInstanceCal.set(Calendar.SECOND, 0);
         dsInstanceCal.set(Calendar.MILLISECOND, 0);
 
-        int[] instCnt = new int[1];
+        final AtomicInteger instCnt = new AtomicInteger(0);
         Calendar compInstCal = CoordELFunctions
                 .getCurrentInstance(dsInstanceCal.getTime(), instCnt);
         if (compInstCal == null) {
             return "";
         }
-        int dsInstanceCnt = instCnt[0];
+        int dsInstanceCnt = instCnt.get();
 
         compInstCal = CoordELFunctions.getCurrentInstance(nominalInstanceCal.getTime(), instCnt);
         if (compInstCal == null) {
             return "";
         }
-        int nominalInstanceCnt = instCnt[0];
+        int nominalInstanceCnt = instCnt.get();
 
         return "coord:current(" + (dsInstanceCnt - nominalInstanceCnt) + ")";
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java b/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java
new file mode 100644
index 0000000..c31c5cc
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestOozieTimeUnitConverter {
+    @Rule
+    public final ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void whenSourceIsNullNPEIsThrown() {
+        expectedException.expect(NullPointerException.class);
+
+        new CoordELFunctions.OozieTimeUnitConverter().convertMillis(0, null);
+    }
+
+    @Test
+    public void whenSourceIsNotRecognized() {
+        assertMillisConverted(-1, 0, TimeUnit.END_OF_DAY);
+        assertMillisConverted(-1, 0, TimeUnit.END_OF_WEEK);
+        assertMillisConverted(-1, 0, TimeUnit.END_OF_MONTH);
+        assertMillisConverted(-1, 0, TimeUnit.CRON);
+        assertMillisConverted(-1, 0, TimeUnit.NONE);
+    }
+
+    @Test
+    public void whenSourceMillisAreConvertedToMinutesCorrectly() {
+        assertMillisConverted(0, 1, TimeUnit.MINUTE);
+        assertMillisConverted(0, -1, TimeUnit.MINUTE);
+        assertMillisConverted(1, 60_000, TimeUnit.MINUTE);
+        assertMillisConverted(-1, -60_000, TimeUnit.MINUTE);
+    }
+
+    @Test
+    public void whenSourceMillisAreConvertedToHoursCorrectly() {
+        assertMillisConverted(0, 1, TimeUnit.HOUR);
+        assertMillisConverted(0, -1, TimeUnit.HOUR);
+        assertMillisConverted(1, 3_600_000, TimeUnit.HOUR);
+        assertMillisConverted(-1, -3_600_000, TimeUnit.HOUR);
+    }
+
+    @Test
+    public void whenSourceMillisAreConvertedToDaysCorrectly() {
+        assertMillisConverted(0, 1, TimeUnit.DAY);
+        assertMillisConverted(0, -1, TimeUnit.DAY);
+        assertMillisConverted(1, 86_400_000, TimeUnit.DAY);
+        assertMillisConverted(-1, -86_400_000, TimeUnit.DAY);
+    }
+
+    private void assertMillisConverted(final long expectedTUCount, final long millis, final TimeUnit oozieTU) {
+        assertEquals(String.format("%d millis are converted to %s correctly", millis, oozieTU.name()),
+                expectedTUCount,
+                new CoordELFunctions.OozieTimeUnitConverter().convertMillis(millis, oozieTU));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ae54814..1ebbdd4 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.2.0 release (trunk - unreleased)
 
+OOZIE-3381 [coordinator] Enhance logging of CoordElFunctions (andras.piros via kmarton)
 OOZIE-3380 TestCoordMaterializeTransitionXCommand failure after DST change date (asalamon74 via kmarton)
 OOZIE-3338 [build] Remove SVN references (asalamon74 via andras.piros)
 OOZIE-3378 [core] Coordinator action's status is SUBMITTED after E1003 error (asalamon74 via andras.piros)