You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by km...@apache.org on 2018/11/22 10:32:39 UTC
oozie git commit: OOZIE-3381 [coordinator] Enhance logging of
CoordElFunctions (andras.piros via kmarton)
Repository: oozie
Updated Branches:
refs/heads/master 34289c4c7 -> 27e4bf168
OOZIE-3381 [coordinator] Enhance logging of CoordElFunctions (andras.piros via kmarton)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/27e4bf16
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/27e4bf16
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/27e4bf16
Branch: refs/heads/master
Commit: 27e4bf1688a6a7750b9c8454de5021337696fd61
Parents: 34289c4
Author: Julia Kinga Marton <km...@apache.org>
Authored: Thu Nov 22 11:31:12 2018 +0100
Committer: Julia Kinga Marton <km...@apache.org>
Committed: Thu Nov 22 11:31:12 2018 +0100
----------------------------------------------------------------------
.../java/org/apache/oozie/command/XCommand.java | 2 +-
.../apache/oozie/coord/CoordELFunctions.java | 683 ++++++++++++-------
.../oozie/command/coord/CoordELExtensions.java | 7 +-
.../oozie/coord/TestOozieTimeUnitConverter.java | 76 +++
release-log.txt | 1 +
5 files changed, 503 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/main/java/org/apache/oozie/command/XCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java
index a80444e..7b2dbd5 100644
--- a/core/src/main/java/org/apache/oozie/command/XCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/XCommand.java
@@ -218,7 +218,7 @@ public abstract class XCommand<T> implements XCallable<T> {
getLockTimeOut(), getName());
}
else {
- throw new CommandException(ErrorCode.E0606, this.toString(), getLockTimeOut());
+ throw new CommandException(ErrorCode.E0606, getEntityKey(), getLockTimeOut());
}
}
else {
http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
index 10f4f0d..c3fecd8 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
@@ -18,6 +18,8 @@
package org.apache.oozie.coord;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
@@ -28,6 +30,7 @@ import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.dependency.URIHandler.Context;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.DateUtils;
@@ -37,18 +40,23 @@ import org.apache.oozie.util.XLog;
import org.jdom.JDOMException;
import java.net.URI;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.List;
import java.util.TimeZone;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* This class implements the EL function related to coordinator
*/
public class CoordELFunctions {
+ private static final XLog LOG = XLog.getLog(CoordELFunctions.class);
+
final public static String DATASET = "oozie.coord.el.dataset.bean";
final public static String COORD_ACTION = "oozie.coord.el.app.bean";
final public static String CONFIGURATION = "oozie.coord.el.conf";
@@ -320,105 +328,8 @@ public class CoordELFunctions {
return coord_futureRange_sync(n, n, instance);
}
- private static String coord_futureRange_sync(int startOffset, int endOffset, int instance) throws Exception {
- final XLog LOG = XLog.getLog(CoordELFunctions.class);
- final Thread currentThread = Thread.currentThread();
- ELEvaluator eval = ELEvaluator.getCurrent();
- String retVal = "";
- int datasetFrequency = (int) getDSFrequency();// in minutes
- TimeUnit dsTimeUnit = getDSTimeUnit();
- int[] instCount = new int[1];
- Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
- StringBuilder resolvedInstances = new StringBuilder();
- StringBuilder resolvedURIPaths = new StringBuilder();
- if (nominalInstanceCal != null) {
- Calendar initInstance = getInitialInstanceCal();
- nominalInstanceCal = (Calendar) initInstance.clone();
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
-
- SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
- if (ds == null) {
- throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
- }
- String uriTemplate = ds.getUriTemplate();
- Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
- if (conf == null) {
- throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
- }
- int available = 0, checkedInstance = 0;
- boolean resolved = false;
- String user = ParamChecker
- .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
- String doneFlag = ds.getDoneFlag();
- URIHandlerService uriService = Services.get().get(URIHandlerService.class);
- URIHandler uriHandler = null;
- Context uriContext = null;
- try {
- while (instance >= checkedInstance && !currentThread.isInterrupted()) {
- ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
- String uriPath = uriEval.evaluate(uriTemplate, String.class);
- if (uriHandler == null) {
- URI uri = new URI(uriPath);
- uriHandler = uriService.getURIHandler(uri);
- uriContext = uriHandler.getContext(uri, conf, user, true);
- }
- String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
- if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
- if (available == endOffset) {
- LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
- resolved = true;
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
- resolvedURIPaths.append(uriPath);
- retVal = resolvedInstances.toString();
- eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
- break;
- }
- else if (available >= startOffset) {
- LOG.debug("Matched future(" + available + "): " + uriWithDoneFlag);
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
- INSTANCE_SEPARATOR);
- resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
-
- }
- available++;
- }
- // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), datasetFrequency);
- nominalInstanceCal = (Calendar) initInstance.clone();
- instCount[0]++;
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
- checkedInstance++;
- // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
- }
- if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
- eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
- }
-
- }
- finally {
- if (uriContext != null) {
- uriContext.destroy();
- }
- }
- if (!resolved) {
- // return unchanged future function with variable 'is_resolved'
- // to 'false'
- eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
- if (startOffset == endOffset) {
- retVal = "${coord:future(" + startOffset + ", " + instance + ")}";
- }
- else {
- retVal = "${coord:futureRange(" + startOffset + ", " + endOffset + ", " + instance + ")}";
- }
- }
- else {
- eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
- }
- }
- else {// No feasible nominal time
- eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
- retVal = "";
- }
- return retVal;
+ private static String coord_futureRange_sync(final int startOffset, final int endOffset, final int instance) throws Exception {
+ return new FutureEvaluator(startOffset, endOffset, instance).evaluate();
}
/**
@@ -843,7 +754,7 @@ public class CoordELFunctions {
}
public static String ph2_coord_absolute_range(String startInstance, int end) throws Exception {
- int[] instanceCount = new int[1];
+ final AtomicInteger instanceCount = new AtomicInteger(0);
Calendar startInstanceCal = DateUtils.getCalendar(startInstance);
Calendar currentInstance = getCurrentInstance(startInstanceCal.getTime(), instanceCount);
// getCurrentInstance() returns null, which means startInstance is less
@@ -859,7 +770,7 @@ public class CoordELFunctions {
+ DateUtils.formatDateOozieTZ(getInitialInstanceCal()) + " and start-instance is "
+ DateUtils.formatDateOozieTZ(startInstanceCal));
}
- int[] nominalCount = new int[1];
+ final AtomicInteger nominalCount = new AtomicInteger(0);
if (getCurrentInstance(getActionCreationtime(), nominalCount) == null) {
throw new CommandException(ErrorCode.E1010,
"initial-instance should be equal or earlier than the nominal time. initial-instance is "
@@ -868,11 +779,11 @@ public class CoordELFunctions {
// getCurrentInstance return offset relative to initial instance.
// start instance offset - nominal offset = start offset relative to
// nominal time-stamp.
- int start = instanceCount[0] - nominalCount[0];
+ int start = instanceCount.get() - nominalCount.get();
if (start > end) {
throw new CommandException(ErrorCode.E1010,
"start-instance should be equal or earlier than the end-instance. startInstance is "
- + startInstance + " which is equivalent to current (" + instanceCount[0]
+ + startInstance + " which is equivalent to current (" + instanceCount.get()
+ ") but end is specified as current (" + end + ")");
}
return ph2_coord_currentRange(start, end);
@@ -1051,7 +962,7 @@ public class CoordELFunctions {
final XLog LOG = XLog.getLog(CoordELFunctions.class);
int datasetFrequency = getDSFrequency();// in minutes
TimeUnit dsTimeUnit = getDSTimeUnit();
- int[] instCount = new int[1];// used as pass by ref
+ final AtomicInteger instCount = new AtomicInteger(0);// used as pass by ref
Calendar nominalInstanceCal = getCurrentInstance(getActionCreationtime(), instCount);
if (nominalInstanceCal == null) {
LOG.warn("If the initial instance of the dataset is later than the nominal time, an empty string is"
@@ -1062,7 +973,7 @@ public class CoordELFunctions {
Calendar initInstance = getInitialInstanceCal();
// Add in the reverse order - newest instance first.
nominalInstanceCal = (Calendar) initInstance.clone();
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount[0] + start) * datasetFrequency);
+ nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), (instCount.get() + start) * datasetFrequency);
List<String> instances = new ArrayList<String>();
for (int i = start; i <= end; i++) {
if (nominalInstanceCal.compareTo(initInstance) < 0) {
@@ -1153,108 +1064,7 @@ public class CoordELFunctions {
}
private static String coord_latestRange_sync(int startOffset, int endOffset) throws Exception {
- final XLog LOG = XLog.getLog(CoordELFunctions.class);
- final Thread currentThread = Thread.currentThread();
- ELEvaluator eval = ELEvaluator.getCurrent();
- String retVal = "";
- int datasetFrequency = (int) getDSFrequency();// in minutes
- TimeUnit dsTimeUnit = getDSTimeUnit();
- int[] instCount = new int[1];
- boolean useCurrentTime = Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
- Calendar nominalInstanceCal;
- if (useCurrentTime) {
- nominalInstanceCal = getCurrentInstance(new Date(), instCount);
- }
- else {
- nominalInstanceCal = getCurrentInstance(getActualTime(), instCount);
- }
- StringBuilder resolvedInstances = new StringBuilder();
- StringBuilder resolvedURIPaths = new StringBuilder();
- if (nominalInstanceCal != null) {
- Calendar initInstance = getInitialInstanceCal();
- SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
- if (ds == null) {
- throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
- }
- String uriTemplate = ds.getUriTemplate();
- Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
- if (conf == null) {
- throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
- }
- int available = 0;
- boolean resolved = false;
- String user = ParamChecker
- .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
- String doneFlag = ds.getDoneFlag();
- URIHandlerService uriService = Services.get().get(URIHandlerService.class);
- URIHandler uriHandler = null;
- Context uriContext = null;
- try {
- while (nominalInstanceCal.compareTo(initInstance) >= 0 && !currentThread.isInterrupted()) {
- ELEvaluator uriEval = getUriEvaluator(nominalInstanceCal);
- String uriPath = uriEval.evaluate(uriTemplate, String.class);
- if (uriHandler == null) {
- URI uri = new URI(uriPath);
- uriHandler = uriService.getURIHandler(uri);
- uriContext = uriHandler.getContext(uri, conf, user, true);
- }
- String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
- if (uriHandler.exists(new URI(uriWithDoneFlag), uriContext)) {
- XLog.getLog(CoordELFunctions.class)
- .debug("Found latest(" + available + "): " + uriWithDoneFlag);
- if (available == startOffset) {
- LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
- resolved = true;
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
- resolvedURIPaths.append(uriPath);
- retVal = resolvedInstances.toString();
- eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
-
- break;
- }
- else if (available <= endOffset) {
- LOG.debug("Matched latest(" + available + "): " + uriWithDoneFlag);
- resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(
- INSTANCE_SEPARATOR);
- resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
- }
-
- available--;
- }
- // nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), -datasetFrequency);
- nominalInstanceCal = (Calendar) initInstance.clone();
- instCount[0]--;
- nominalInstanceCal.add(dsTimeUnit.getCalendarUnit(), instCount[0] * datasetFrequency);
- // DateUtils.moveToEnd(nominalInstanceCal, getDSEndOfFlag());
- }
- if (!StringUtils.isEmpty(resolvedURIPaths.toString()) && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
- eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
- }
- }
- finally {
- if (uriContext != null) {
- uriContext.destroy();
- }
- }
- if (!resolved) {
- // return unchanged latest function with variable 'is_resolved'
- // to 'false'
- eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
- if (startOffset == endOffset) {
- retVal = "${coord:latest(" + startOffset + ")}";
- }
- else {
- retVal = "${coord:latestRange(" + startOffset + "," + endOffset + ")}";
- }
- }
- else {
- eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
- }
- }
- else {// No feasible nominal time
- eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
- }
- return retVal;
+ return new LatestEvaluator(startOffset, endOffset).evaluate();
}
/**
@@ -1406,7 +1216,7 @@ public class CoordELFunctions {
* @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
* the dataset.
*/
- public static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[]) {
+ public static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount) {
ELEvaluator eval = ELEvaluator.getCurrent();
return getCurrentInstance(effectiveTime, instanceCount, eval);
}
@@ -1417,7 +1227,7 @@ public class CoordELFunctions {
* @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
* the dataset.
*/
- private static Calendar getCurrentInstance(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
+ private static Calendar getCurrentInstance(Date effectiveTime, AtomicInteger instanceCount, ELEvaluator eval) {
Date datasetInitialInstance = getInitialInstance(eval);
TimeUnit dsTimeUnit = getDSTimeUnit(eval);
TimeZone dsTZ = getDatasetTZ(eval);
@@ -1429,91 +1239,52 @@ public class CoordELFunctions {
Calendar calEffectiveTime = new GregorianCalendar(dsTZ);
calEffectiveTime.setTime(effectiveTime);
if (instanceCount == null) { // caller doesn't care about this value
- instanceCount = new int[1];
+ instanceCount = new AtomicInteger(0);
}
- instanceCount[0] = 0;
+ instanceCount.set(0);
if (current.compareTo(calEffectiveTime) > 0) {
return null;
}
switch(dsTimeUnit) {
case MINUTE:
- instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC);
+ instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / MINUTE_MSEC));
break;
case HOUR:
- instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC);
+ instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / HOUR_MSEC));
break;
case DAY:
case END_OF_DAY:
- instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC);
+ instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / DAY_MSEC));
break;
case WEEK:
case END_OF_WEEK:
- instanceCount[0] = (int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / WEEK_MSEC);
+ instanceCount.set((int) ((effectiveTime.getTime() - datasetInitialInstance.getTime()) / WEEK_MSEC));
break;
case MONTH:
case END_OF_MONTH:
int diffYear = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR);
- instanceCount[0] = diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH);
+ instanceCount.set(diffYear * 12 + calEffectiveTime.get(Calendar.MONTH) - current.get(Calendar.MONTH));
break;
case YEAR:
- instanceCount[0] = calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR);
+ instanceCount.set(calEffectiveTime.get(Calendar.YEAR) - current.get(Calendar.YEAR));
break;
default:
throw new IllegalArgumentException("Unhandled dataset time unit " + dsTimeUnit);
}
- if (instanceCount[0] > 2) {
- instanceCount[0] = (instanceCount[0] / dsFreq);
- current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
+ if (instanceCount.get() > 2) {
+ instanceCount.set(instanceCount.get() / dsFreq);
+ current.add(dsTimeUnit.getCalendarUnit(), instanceCount.get() * dsFreq);
} else {
- instanceCount[0] = 0;
+ instanceCount.set(0);
}
while (!current.getTime().after(effectiveTime)) {
current.add(dsTimeUnit.getCalendarUnit(), dsFreq);
- instanceCount[0]++;
+ instanceCount.incrementAndGet();
}
current.add(dsTimeUnit.getCalendarUnit(), -dsFreq);
- instanceCount[0]--;
- return current;
- }
-
- /**
- * Find the current instance based on effectiveTime (i.e Action_Creation_Time or Action_Start_Time)
- *
- * @return current instance i.e. current(0) returns null if effectiveTime is earlier than Initial Instance time of
- * the dataset.
- */
- private static Calendar getCurrentInstance_old(Date effectiveTime, int instanceCount[], ELEvaluator eval) {
- Date datasetInitialInstance = getInitialInstance(eval);
- TimeUnit dsTimeUnit = getDSTimeUnit(eval);
- TimeZone dsTZ = getDatasetTZ(eval);
- int dsFreq = getDSFrequency(eval);
- // Convert Date to Calendar for corresponding TZ
- Calendar current = Calendar.getInstance();
- current.setTime(datasetInitialInstance);
- current.setTimeZone(dsTZ);
-
- Calendar calEffectiveTime = Calendar.getInstance();
- calEffectiveTime.setTime(effectiveTime);
- calEffectiveTime.setTimeZone(dsTZ);
- if (instanceCount == null) { // caller doesn't care about this value
- instanceCount = new int[1];
- }
- instanceCount[0] = 0;
- if (current.compareTo(calEffectiveTime) > 0) {
- return null;
- }
- Calendar origCurrent = (Calendar) current.clone();
- while (current.compareTo(calEffectiveTime) <= 0) {
- current = (Calendar) origCurrent.clone();
- instanceCount[0]++;
- current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
- }
- instanceCount[0]--;
-
- current = (Calendar) origCurrent.clone();
- current.add(dsTimeUnit.getCalendarUnit(), instanceCount[0] * dsFreq);
+ instanceCount.decrementAndGet();
return current;
}
@@ -1706,4 +1477,392 @@ public class CoordELFunctions {
cal.add(timeUnit.getCalendarUnit(), n);
return cal;
}
+
+ /**
+ * Evaluating {@code coord:future()} and {@code coord:futureRange()} data input dependencies.
+ */
+ static final class FutureEvaluator extends RangeEvaluator {
+ private final int instance;
+ private int checkedInstance = 0;
+
+ FutureEvaluator(final int startOffset, final int endOffset, final int instance) {
+ super("future", startOffset, endOffset);
+ this.instance = instance;
+ }
+
+ @Override
+ protected Calendar getNominalInstance() {
+ return getCurrentInstance(getActionCreationtime(), instCount);
+ }
+
+ @Override
+ protected void reset() {
+ super.reset();
+ checkedInstance = 0;
+ }
+
+ @Override
+ protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) {
+ return instance >= checkedInstance;
+ }
+
+ @Override
+ protected boolean isFirst() {
+ return available == endOffset;
+ }
+
+ @Override
+ protected boolean isInBetween() {
+ return available >= startOffset;
+ }
+
+ @Override
+ protected void stepAvailable() {
+ available++;
+ }
+
+ @Override
+ protected void stepInstanceCount() {
+ instCount.incrementAndGet();
+ checkedInstance++;
+ }
+
+ @Override
+ protected String from() {
+ return String.format("%s, %s", startOffset, instance);
+ }
+
+ @Override
+ protected String fromTo() {
+ return String.format("%s, %s, %s", startOffset, endOffset, instance);
+ }
+ }
+
+ /**
+ * Evaluating {@code coord:latest()} and {@code coord:latestRange()} data input dependencies.
+ */
+ static final class LatestEvaluator extends RangeEvaluator {
+
+ LatestEvaluator(final int startOffset, final int endOffset) {
+ super("latest", startOffset, endOffset);
+ }
+
+ @Override
+ protected Calendar getNominalInstance() {
+ final boolean useCurrentTime = ConfigurationService.getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
+ if (useCurrentTime) {
+ return getCurrentInstance(new Date(), instCount);
+ }
+ else {
+ return getCurrentInstance(getActualTime(), instCount);
+ }
+ }
+
+ @Override
+ protected boolean isAvailable(final Calendar nominalInstance, final Calendar initInstance) {
+ return nominalInstance.compareTo(initInstance) >= 0;
+ }
+
+ @Override
+ protected boolean isFirst() {
+ return available == startOffset;
+ }
+
+ @Override
+ protected boolean isInBetween() {
+ return available <= endOffset;
+ }
+
+ @Override
+ protected void stepAvailable() {
+ available--;
+ }
+
+ @Override
+ protected void stepInstanceCount() {
+ instCount.decrementAndGet();
+ }
+
+ @Override
+ protected String from() {
+ return String.format("%s", startOffset);
+ }
+
+ @Override
+ protected String fromTo() {
+ return String.format("%s, %s", startOffset, endOffset);
+ }
+ }
+
+ private static abstract class RangeEvaluator {
+ /**
+ * What is the use case: evaluating {@code future} or {@code latest} dataset occurrences.
+ */
+ private final String type;
+
+ /**
+ *
+ */
+ final int startOffset;
+ final int endOffset;
+
+ final AtomicInteger instCount = new AtomicInteger(0);
+ int available = 0;
+
+ RangeEvaluator(final String type, final int startOffset, final int endOffset) {
+ this.type = type;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
+ }
+
+ /**
+ * Evaluate the input data dependency's EL function based on HDFS checks for URI existence, or leave it unevaluated.
+ * <p>
+ * Based on following:
+ * <ul>
+ * <li>{@code startOffset}</li>
+ * <li>{@code endOffset}</li>
+ * <li>internal state like {@code instCount} and {@code available}</li>
+ * </ul>
+ * @return timestamp of the {@code future} / {@code latest} HDFS URI based on parameters, and HDFS URI presence, or the
+ * unmodified input EL expression, if not present.
+ * @throws Exception in case of wrong arguments, or HDFS access errors
+ */
+ String evaluate() throws Exception {
+ final Thread currentThread = Thread.currentThread();
+ final ELEvaluator eval = ELEvaluator.getCurrent();
+ String retVal = "";
+ final int datasetFrequency = getDSFrequency();// in minutes
+ final TimeUnit dsTimeUnit = getDSTimeUnit();
+ instCount.set(0);
+
+ Calendar nominalInstance = getNominalInstance();
+
+ final StringBuilder resolvedInstances = new StringBuilder();
+ final StringBuilder resolvedURIPaths = new StringBuilder();
+ if (nominalInstance != null) {
+ final Calendar initInstance = getInitialInstanceCal();
+ SyncCoordDataset ds = (SyncCoordDataset) eval.getVariable(DATASET);
+ if (ds == null) {
+ throw new RuntimeException("Associated Dataset should be defined with key " + DATASET);
+ }
+ final String uriTemplate = ds.getUriTemplate();
+ final Configuration conf = (Configuration) eval.getVariable(CONFIGURATION);
+ if (conf == null) {
+ throw new RuntimeException("Associated Configuration should be defined with key " + CONFIGURATION);
+ }
+
+ reset();
+
+ boolean resolved = false;
+ final String user = ParamChecker
+ .notEmpty((String) eval.getVariable(OozieClient.USER_NAME), OozieClient.USER_NAME);
+ final String doneFlag = ds.getDoneFlag();
+ final URIHandlerService uriService = Services.get().get(URIHandlerService.class);
+ URIHandler uriHandler = null;
+ Context uriContext = null;
+ try {
+ int retries = 0;
+ final DateFormat dMYHMS = new SimpleDateFormat("yyyy.MM.dd HH:mm:ss");
+ final long maxRetries = new OozieTimeUnitConverter().convertMillis(
+ (nominalInstance.getTime().getTime() - initInstance.getTime().getTime()) / ds.getFrequency(),
+ dsTimeUnit);
+ if (maxRetries > 0) {
+ LOG.debug("Approximately [{0}] maximal retries going back till [{1}] for checking latest " +
+ "dataset existence. [name={2}]",
+ maxRetries,
+ dMYHMS.format(initInstance.getTime()),
+ ds.getName());
+ }
+ while (isAvailable(nominalInstance, initInstance) && !currentThread.isInterrupted()) {
+ final ELEvaluator uriEval = getUriEvaluator(nominalInstance);
+ final String uriPath = uriEval.evaluate(uriTemplate, String.class);
+ if (uriHandler == null) {
+ URI uri = new URI(uriPath);
+ uriHandler = uriService.getURIHandler(uri);
+ uriContext = uriHandler.getContext(uri, conf, user, true);
+ }
+ final String uriWithDoneFlag = uriHandler.getURIWithDoneFlag(uriPath, doneFlag);
+ LOG.trace("Checking dataset existence. [name={0};uriWithDoneFlag={1};retries={2};nominalTime={3}]",
+ ds.getName(),
+ uriWithDoneFlag,
+ retries,
+ dMYHMS.format(nominalInstance.getTime()));
+ final Date now = new Date();
+ final boolean uriWithDoneFlagExists = uriHandler.exists(new URI(uriWithDoneFlag), uriContext);
+ final Date later = new Date();
+ LOG.trace("[{0}] ms elapsed while checking for dataset existence. [name={1};uriWithDoneFlag={2}]",
+ later.getTime() - now.getTime(),
+ ds.getName(),
+ uriWithDoneFlag);
+ if (uriWithDoneFlagExists) {
+ LOG.debug("Found current dataset for {0}({1}). [name={2};uriWithDoneFlag={3}",
+ type,
+ available,
+ ds.getName(),
+ uriWithDoneFlag);
+ if (isFirst()) {
+ LOG.debug("Matched first dataset for {0}({1}), resolving. [name={2};uriWithDoneFlag={3}]",
+ type,
+ available,
+ ds.getName(),
+ uriWithDoneFlag);
+ resolved = true;
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance));
+ resolvedURIPaths.append(uriPath);
+ retVal = resolvedInstances.toString();
+ eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
+
+ break;
+ }
+ else if (isInBetween()) {
+ LOG.debug("Matched dataset in between for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]",
+ type,
+ available,
+ ds.getName(),
+ uriWithDoneFlag);
+ resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstance)).append(
+ INSTANCE_SEPARATOR);
+ resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
+ }
+ else {
+ LOG.debug("Not matching dataset for {0}({1}), continuing. [name={2};uriWithDoneFlag={3}]",
+ type,
+ available,
+ ds.getName(),
+ uriWithDoneFlag);
+ }
+
+ stepAvailable();
+ }
+ else {
+ LOG.trace("Could not find dataset. [name={0};uriWithDoneFlag={1}]",
+ ds.getName(),
+ uriWithDoneFlag);
+ }
+
+ stepInstanceCount();
+
+ nominalInstance = (Calendar) initInstance.clone();
+ nominalInstance.add(dsTimeUnit.getCalendarUnit(), instCount.get() * datasetFrequency);
+
+ retries++;
+ }
+ if (!StringUtils.isEmpty(resolvedURIPaths.toString())
+ && eval.getVariable(CoordELConstants.RESOLVED_PATH) == null) {
+ eval.setVariable(CoordELConstants.RESOLVED_PATH, resolvedURIPaths.toString());
+ }
+ }
+ finally {
+ if (uriContext != null) {
+ uriContext.destroy();
+ }
+ }
+ if (!resolved) {
+ // return unchanged function with variable 'is_resolved'
+ // to 'false'
+ eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
+ if (startOffset == endOffset) {
+ retVal = String.format("${coord:%s(%s)}", type, from());
+ }
+ else {
+ retVal = String.format("${coord:%sRange(%s)}", type, fromTo());
+ }
+ }
+ else {
+ eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.TRUE);
+ }
+ }
+ else {// No feasible nominal time
+ eval.setVariable(CoordELConstants.IS_RESOLVED, Boolean.FALSE);
+ }
+
+ return retVal;
+
+ }
+
+ /**
+ * Get the nominal {@link Calendar} instance, that is, as base of the next evaluated occurrence.
+ * @return a {@link Calendar} instance based on {@link #type}
+ */
+ protected abstract Calendar getNominalInstance();
+
+ /**
+ * Reset the evaluator's internal state between two {@link #evaluate()} calls.
+ */
+ protected void reset() {
+ available = 0;
+ }
+
+ /**
+ * Checks whether a dataset is available.
+ * @param nominalInstance the nominal instance
+ * @param initInstance
+ * @return
+ */
+ protected abstract boolean isAvailable(Calendar nominalInstance, Calendar initInstance);
+
+ /**
+ * Checks whether it's the first matching for the given evaluation.
+ * @return
+ */
+ protected abstract boolean isFirst();
+
+ /**
+ * Checks whether it's not the first but a valid matching for the given evaluation.
+ * @return
+ */
+ protected abstract boolean isInBetween();
+
+ /**
+ * Modify the internal state in case of a match.
+ */
+ protected abstract void stepAvailable();
+
+ /**
+ * Modify the internal state in case an instance was checked.
+ */
+ protected abstract void stepInstanceCount();
+
+ /**
+ * Substitution of the range parameters for an open range with only a starting point.
+ * @return
+ */
+ protected abstract String from();
+
+
+ /**
+ * Substitution of the range parameters for a closed range with a starting and an ending point.
+ * @return
+ */
+ protected abstract String fromTo();
+ }
+
+ @VisibleForTesting
+ static class OozieTimeUnitConverter {
+ /**
+ * Convert {@code millis} given {@code source} to {@link java.util.concurrent.TimeUnit}.
+ * @param millis
+ * @param source
+ * @return -1 if no correct {@code source} was given, else the estimated occurrence count of a dataset
+ */
+ long convertMillis(final long millis, final TimeUnit source) {
+ Preconditions.checkNotNull(source, "source has to be filled");
+
+ switch (source) {
+ case YEAR:
+ return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 365;
+ case MONTH:
+ return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS) / 31;
+ case DAY:
+ return java.util.concurrent.TimeUnit.DAYS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
+ case HOUR:
+ return java.util.concurrent.TimeUnit.HOURS.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
+ case MINUTE:
+ return java.util.concurrent.TimeUnit.MINUTES.convert(millis, java.util.concurrent.TimeUnit.MILLISECONDS);
+ default:
+ return -1;
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java b/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
index 796d19c..2880641 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/CoordELExtensions.java
@@ -19,6 +19,7 @@
package org.apache.oozie.command.coord;
import java.util.Calendar;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.util.ELEvaluator;
@@ -45,19 +46,19 @@ public class CoordELExtensions {
dsInstanceCal.set(Calendar.SECOND, 0);
dsInstanceCal.set(Calendar.MILLISECOND, 0);
- int[] instCnt = new int[1];
+ final AtomicInteger instCnt = new AtomicInteger(0);
Calendar compInstCal = CoordELFunctions
.getCurrentInstance(dsInstanceCal.getTime(), instCnt);
if (compInstCal == null) {
return "";
}
- int dsInstanceCnt = instCnt[0];
+ int dsInstanceCnt = instCnt.get();
compInstCal = CoordELFunctions.getCurrentInstance(nominalInstanceCal.getTime(), instCnt);
if (compInstCal == null) {
return "";
}
- int nominalInstanceCnt = instCnt[0];
+ int nominalInstanceCnt = instCnt.get();
return "coord:current(" + (dsInstanceCnt - nominalInstanceCnt) + ")";
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java b/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java
new file mode 100644
index 0000000..c31c5cc
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/coord/TestOozieTimeUnitConverter.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestOozieTimeUnitConverter {
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void whenSourceIsNullNPEIsThrown() {
+ expectedException.expect(NullPointerException.class);
+
+ new CoordELFunctions.OozieTimeUnitConverter().convertMillis(0, null);
+ }
+
+ @Test
+ public void whenSourceIsNotRecognized() {
+ assertMillisConverted(-1, 0, TimeUnit.END_OF_DAY);
+ assertMillisConverted(-1, 0, TimeUnit.END_OF_WEEK);
+ assertMillisConverted(-1, 0, TimeUnit.END_OF_MONTH);
+ assertMillisConverted(-1, 0, TimeUnit.CRON);
+ assertMillisConverted(-1, 0, TimeUnit.NONE);
+ }
+
+ @Test
+ public void whenSourceMillisAreConvertedToMinutesCorrectly() {
+ assertMillisConverted(0, 1, TimeUnit.MINUTE);
+ assertMillisConverted(0, -1, TimeUnit.MINUTE);
+ assertMillisConverted(1, 60_000, TimeUnit.MINUTE);
+ assertMillisConverted(-1, -60_000, TimeUnit.MINUTE);
+ }
+
+ @Test
+ public void whenSourceMillisAreConvertedToHoursCorrectly() {
+ assertMillisConverted(0, 1, TimeUnit.HOUR);
+ assertMillisConverted(0, -1, TimeUnit.HOUR);
+ assertMillisConverted(1, 3_600_000, TimeUnit.HOUR);
+ assertMillisConverted(-1, -3_600_000, TimeUnit.HOUR);
+ }
+
+ @Test
+ public void whenSourceMillisAreConvertedToDaysCorrectly() {
+ assertMillisConverted(0, 1, TimeUnit.DAY);
+ assertMillisConverted(0, -1, TimeUnit.DAY);
+ assertMillisConverted(1, 86_400_000, TimeUnit.DAY);
+ assertMillisConverted(-1, -86_400_000, TimeUnit.DAY);
+ }
+
+ private void assertMillisConverted(final long expectedTUCount, final long millis, final TimeUnit oozieTU) {
+ assertEquals(String.format("%d millis are converted to %s correctly", millis, oozieTU.name()),
+ expectedTUCount,
+ new CoordELFunctions.OozieTimeUnitConverter().convertMillis(millis, oozieTU));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/27e4bf16/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ae54814..1ebbdd4 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.2.0 release (trunk - unreleased)
+OOZIE-3381 [coordinator] Enhance logging of CoordElFunctions (andras.piros via kmarton)
OOZIE-3380 TestCoordMaterializeTransitionXCommand failure after DST change date (asalamon74 via kmarton)
OOZIE-3338 [build] Remove SVN references (asalamon74 via andras.piros)
OOZIE-3378 [core] Coordinator action's status is SUBMITTED after E1003 error (asalamon74 via andras.piros)