You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 21:46:37 UTC
[5/5] oozie git commit: OOZIE-1976 Specifying coordinator input
datasets in more logical ways
OOZIE-1976 Specifying coordinator input datasets in more logical ways
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/81ce22b6
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/81ce22b6
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/81ce22b6
Branch: refs/heads/master
Commit: 81ce22b6f23b2bba49df4733961ee82b58c38d0d
Parents: 5abd3e6
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Jan 26 12:46:16 2016 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Jan 26 12:46:16 2016 -0800
----------------------------------------------------------------------
.../main/resources/oozie-coordinator-0.5.xsd | 194 ++++
core/pom.xml | 5 +
.../org/apache/oozie/CoordinatorActionBean.java | 46 +-
.../main/java/org/apache/oozie/ErrorCode.java | 2 +
.../coord/CoordActionInputCheckXCommand.java | 318 ++----
.../CoordActionUpdatePushMissingDependency.java | 30 +-
.../oozie/command/coord/CoordCommandUtils.java | 180 ++-
.../CoordMaterializeTransitionXCommand.java | 6 +-
.../coord/CoordPushDependencyCheckXCommand.java | 73 +-
.../command/coord/CoordSubmitXCommand.java | 27 +
.../apache/oozie/coord/CoordELConstants.java | 3 +
.../apache/oozie/coord/CoordELEvaluator.java | 23 +-
.../apache/oozie/coord/CoordELFunctions.java | 49 +-
.../java/org/apache/oozie/coord/CoordUtils.java | 22 +
.../org/apache/oozie/coord/SyncCoordAction.java | 22 +
.../AbstractCoordInputDependency.java | 315 ++++++
.../input/dependency/CoordInputDependency.java | 172 +++
.../dependency/CoordInputDependencyFactory.java | 170 +++
.../input/dependency/CoordInputInstance.java | 83 ++
.../dependency/CoordOldInputDependency.java | 309 +++++
.../dependency/CoordPullInputDependency.java | 151 +++
.../dependency/CoordPushInputDependency.java | 49 +
.../CoordUnResolvedInputDependency.java | 92 ++
.../input/logic/CoordInputLogicBuilder.java | 167 +++
.../input/logic/CoordInputLogicEvaluator.java | 44 +
.../logic/CoordInputLogicEvaluatorPhaseOne.java | 324 ++++++
.../CoordInputLogicEvaluatorPhaseThree.java | 130 +++
.../logic/CoordInputLogicEvaluatorPhaseTwo.java | 144 +++
.../CoordInputLogicEvaluatorPhaseValidate.java | 89 ++
.../logic/CoordInputLogicEvaluatorResult.java | 104 ++
.../logic/CoordInputLogicEvaluatorUtil.java | 229 ++++
.../coord/input/logic/InputLogicParser.java | 309 +++++
.../coord/input/logic/OozieJexlEngine.java | 47 +
.../coord/input/logic/OozieJexlInterpreter.java | 73 ++
.../oozie/dependency/ActionDependency.java | 2 +-
.../oozie/dependency/DependencyChecker.java | 15 +-
.../apache/oozie/dependency/FSURIHandler.java | 9 +
.../apache/oozie/dependency/HCatURIHandler.java | 5 +
.../org/apache/oozie/dependency/URIHandler.java | 14 +
.../org/apache/oozie/util/WritableUtils.java | 148 ++-
core/src/main/resources/oozie-default.xml | 2 +-
.../TestCoordActionInputCheckXCommand.java | 9 +-
.../input/logic/TestCoordInputLogicPush.java | 645 +++++++++++
.../input/logic/TestCoordinatorInputLogic.java | 1054 ++++++++++++++++++
.../coord/input/logic/TestInputLogicParser.java | 367 ++++++
core/src/test/resources/coord-action-sla.xml | 2 +-
.../test/resources/coord-inputlogic-combine.xml | 119 ++
.../test/resources/coord-inputlogic-hcat.xml | 119 ++
.../test/resources/coord-inputlogic-latest.xml | 124 +++
.../resources/coord-inputlogic-range-latest.xml | 130 +++
.../test/resources/coord-inputlogic-range.xml | 107 ++
core/src/test/resources/coord-inputlogic.xml | 126 +++
pom.xml | 7 +
release-log.txt | 1 +
54 files changed, 6625 insertions(+), 381 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/client/src/main/resources/oozie-coordinator-0.5.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/oozie-coordinator-0.5.xsd b/client/src/main/resources/oozie-coordinator-0.5.xsd
new file mode 100644
index 0000000..2b63629
--- /dev/null
+++ b/client/src/main/resources/oozie-coordinator-0.5.xsd
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:coordinator="uri:oozie:coordinator:0.5"
+ elementFormDefault="qualified" targetNamespace="uri:oozie:coordinator:0.5">
+
+ <xs:element name="coordinator-app" type="coordinator:COORDINATOR-APP"/>
+ <xs:element name="datasets" type="coordinator:DATASETS"/>
+ <xs:simpleType name="IDENTIFIER">
+ <xs:restriction base="xs:string">
+ <xs:pattern value="([a-zA-Z]([\-_a-zA-Z0-9])*){1,39}"/>
+ </xs:restriction>
+ </xs:simpleType>
+ <xs:complexType name="COORDINATOR-APP">
+ <xs:sequence>
+ <xs:element name="parameters" type="coordinator:PARAMETERS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="controls" type="coordinator:CONTROLS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="datasets" type="coordinator:DATASETS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="input-events" type="coordinator:INPUTEVENTS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="input-logic" type="coordinator:INPUTLOGIC" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="output-events" type="coordinator:OUTPUTEVENTS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="action" type="coordinator:ACTION" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="xs:string" use="required"/>
+ <xs:attribute name="frequency" type="xs:string" use="required"/>
+ <xs:attribute name="start" type="xs:string" use="required"/>
+ <xs:attribute name="end" type="xs:string" use="required"/>
+ <xs:attribute name="timezone" type="xs:string" use="required"/>
+ </xs:complexType>
+ <xs:complexType name="PARAMETERS">
+ <xs:sequence>
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+ <xs:complexType name="CONTROLS">
+ <xs:sequence minOccurs="0" maxOccurs="1">
+ <xs:element name="timeout" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="concurrency" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="execution" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="throttle" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+ <xs:complexType name="DATASETS">
+ <xs:sequence minOccurs="0" maxOccurs="1">
+ <xs:element name="include" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="dataset" type="coordinator:SYNCDATASET" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="async-dataset" type="coordinator:ASYNCDATASET" minOccurs="0" maxOccurs="1"/>
+ </xs:choice>
+ </xs:sequence>
+ </xs:complexType>
+ <xs:complexType name="SYNCDATASET">
+ <xs:sequence>
+ <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="done-flag" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+ <xs:attribute name="frequency" type="xs:string" use="required"/>
+ <xs:attribute name="initial-instance" type="xs:string" use="required"/>
+ <xs:attribute name="timezone" type="xs:string" use="required"/>
+ </xs:complexType>
+ <xs:complexType name="ASYNCDATASET">
+ <xs:sequence>
+ <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+ <xs:attribute name="sequence-type" type="xs:string" use="required"/>
+ <xs:attribute name="initial-version" type="xs:string" use="required"/>
+ </xs:complexType>
+ <xs:complexType name="INPUTEVENTS">
+ <xs:choice minOccurs="1" maxOccurs="1">
+ <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="data-in" type="coordinator:DATAIN" minOccurs="1" maxOccurs="unbounded"/>
+ </xs:choice>
+ </xs:complexType>
+ <xs:complexType name="INPUTLOGIC">
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/>
+ </xs:choice>
+ </xs:complexType>
+ <xs:complexType name="LOGICALAND">
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/>
+ <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:choice>
+ <xs:attribute name="name" type="xs:string" use="optional"/>
+ <xs:attribute name="min" type="xs:string" use="optional"/>
+ <xs:attribute name="wait" type="xs:string" use="optional"/>
+ </xs:complexType>
+ <xs:complexType name="LOGICALOR">
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/>
+ <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:choice>
+ <xs:attribute name="name" type="xs:string" use="optional"/>
+ <xs:attribute name="min" type="xs:string" use="optional"/>
+ <xs:attribute name="wait" type="xs:string" use="optional"/>
+ </xs:complexType>
+ <xs:complexType name="COMBINE">
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="2" maxOccurs="unbounded"/>
+ </xs:choice>
+ <xs:attribute name="name" type="xs:string" use="optional"/>
+ <xs:attribute name="min" type="xs:string" use="optional"/>
+ <xs:attribute name="wait" type="xs:string" use="optional"/>
+ </xs:complexType>
+ <xs:complexType name="LOGICALDATAIN">
+ <xs:attribute name="name" type="xs:string" use="optional"/>
+ <xs:attribute name="min" type="xs:string" use="optional"/>
+ <xs:attribute name="wait" type="xs:string" use="optional"/>
+ <xs:attribute name="dataset" type="xs:string" use="required"/>
+ </xs:complexType>
+ <xs:complexType name="DATAIN">
+ <xs:choice minOccurs="1" maxOccurs="1">
+ <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="unbounded"/>
+ <xs:sequence minOccurs="1" maxOccurs="1">
+ <xs:element name="start-instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="end-instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:choice>
+ <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+ <xs:attribute name="dataset" type="xs:string" use="required"/>
+ </xs:complexType>
+ <xs:complexType name="OUTPUTEVENTS">
+ <xs:sequence minOccurs="1" maxOccurs="1">
+ <xs:element name="data-out" type="coordinator:DATAOUT" minOccurs="1" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+ <xs:complexType name="DATAOUT">
+ <xs:sequence minOccurs="1" maxOccurs="1">
+ <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+ <xs:attribute name="dataset" type="xs:string" use="required"/>
+ </xs:complexType>
+ <xs:complexType name="ACTION">
+ <xs:sequence minOccurs="1" maxOccurs="1">
+ <xs:element name="workflow" type="coordinator:WORKFLOW" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="uri:oozie:sla:0.1 uri:oozie:sla:0.2" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+ <xs:complexType name="WORKFLOW">
+ <xs:sequence>
+ <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="configuration" type="coordinator:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="FLAG"/>
+ <xs:complexType name="CONFIGURATION">
+ <xs:sequence>
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+</xs:schema>
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index b063dab..b72ea7d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -282,6 +282,11 @@
<version>3.4</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-jexl</artifactId>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>org.apache.oozie</groupId>
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 91bff4d..b1be7c9 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -34,12 +34,15 @@ import javax.persistence.Lob;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.Table;
+import javax.persistence.Transient;
import org.apache.hadoop.io.Writable;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.JsonUtils;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.WritableUtils;
import org.apache.openjpa.persistence.jdbc.Index;
@@ -285,6 +288,13 @@ public class CoordinatorActionBean implements
return toJSONObject("GMT");
}
+ @Transient
+ private CoordInputDependency coordPushInputDependency;
+
+ @Transient
+ private CoordInputDependency coordPullInputDependency;
+
+
public CoordinatorActionBean() {
}
@@ -745,23 +755,21 @@ public class CoordinatorActionBean implements
json.put(JsonTags.COORDINATOR_ACTION_TYPE, type);
json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber);
json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf());
- json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils
- .formatDateRfc822(getCreatedTime(), timeZoneId));
- json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils
- .formatDateRfc822(getNominalTime(), timeZoneId));
+ json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
+ json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils.formatDateRfc822(getNominalTime(), timeZoneId));
json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId);
// json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
// .formatDateRfc822(startTime), timeZoneId);
json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr);
json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf());
- json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, JsonUtils
- .formatDateRfc822(getLastModifiedTime(), timeZoneId));
+ json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME,
+ JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
// json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
// .formatDateRfc822(startTime), timeZoneId);
// json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
// .formatDateRfc822(endTime), timeZoneId);
- json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getMissingDependencies());
- json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushMissingDependencies());
+ json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getPullInputDependencies().getMissingDependencies());
+ json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushInputDependencies().getMissingDependencies());
json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
@@ -818,5 +826,27 @@ public class CoordinatorActionBean implements
return true;
}
+ public CoordInputDependency getPullInputDependencies() {
+ if (coordPullInputDependency == null) {
+ coordPullInputDependency = CoordInputDependencyFactory.getPullInputDependencies(missingDependencies);
+ }
+ return coordPullInputDependency;
+
+ }
+
+ public CoordInputDependency getPushInputDependencies() {
+ if (coordPushInputDependency == null) {
+ coordPushInputDependency = CoordInputDependencyFactory.getPushInputDependencies(pushMissingDependencies);
+ }
+ return coordPushInputDependency;
+ }
+
+ public void setPullInputDependencies(CoordInputDependency coordPullInputDependency) {
+ this.coordPullInputDependency = coordPullInputDependency;
+ }
+
+ public void setPushInputDependencies(CoordInputDependency coordPushInputDependency) {
+ this.coordPushInputDependency = coordPushInputDependency;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 6c1e399..2907ca2 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -214,6 +214,8 @@ public enum ErrorCode {
E1025(XLog.STD, "Coord status transit error: [{0}]"),
E1026(XLog.STD, "SLA alert update command failed: {0}"),
E1027(XLog.STD, "SLA change command failed. {0}"),
+ E1028(XLog.STD, "Coord input logic error. {0}"),
+
E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index 11184d1..640d3cb 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -20,13 +20,9 @@ package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.oozie.CoordinatorActionBean;
@@ -34,14 +30,11 @@ import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
-import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.dependency.URIHandler;
-import org.apache.oozie.dependency.URIHandlerException;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
@@ -54,7 +47,6 @@ import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.LogUtils;
@@ -159,40 +151,38 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
StringBuilder existList = new StringBuilder();
StringBuilder nonExistList = new StringBuilder();
+ CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
+ CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
+
+
+ String missingDependencies = coordPullInputDependency.getMissingDependencies();
StringBuilder nonResolvedList = new StringBuilder();
- String firstMissingDependency = "";
- String missingDeps = coordAction.getMissingDependencies();
- CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
+ CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
+ String firstMissingDependency = "";
// For clarity regarding which is the missing dependency in synchronous order
// instead of printing entire list, some of which, may be available
- if(nonExistList.length() > 0) {
+ if (nonExistList.length() > 0) {
firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
}
LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
+ nonResolvedList.toString());
- // Updating the list of data dependencies that are available and those that are yet not
- boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
- String pushDeps = coordAction.getPushMissingDependencies();
- // Resolve latest/future only when all current missingDependencies and
- // pushMissingDependencies are met
+
+
+ boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf);
+ String nonExistListStr = nonExistList.toString();
+ boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet();
if (status && nonResolvedList.length() > 0) {
- status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
- : false;
+ status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false;
}
coordAction.setLastModifiedTime(currentTime);
coordAction.setActionXml(actionXml.toString());
- if (nonResolvedList.length() > 0 && status == false) {
- nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
- }
- String nonExistListStr = nonExistList.toString();
- if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
- // missingDeps null or empty means action should become READY
- isChangeInDependency = true;
- coordAction.setMissingDependencies(nonExistListStr);
- }
- if (status && (pushDeps == null || pushDeps.length() == 0)) {
- String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
+
+ isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status);
+
+ if (status && isPushDependenciesMet) {
+ String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId,
+ coordPullInputDependency, coordPushInputDependency);
actionXml.replace(0, actionXml.length(), newActionXml);
coordAction.setActionXml(actionXml.toString());
coordAction.setStatus(CoordinatorAction.Status.READY);
@@ -207,7 +197,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
updateCoordAction(coordAction, isChangeInDependency);
}
else {
- if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
+ if (!nonExistListStr.isEmpty() && isPushDependenciesMet) {
queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
}
else {
@@ -246,10 +236,25 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
return null;
}
+ private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
+ StringBuilder nonResolvedList, boolean status) throws IOException {
+ if (nonResolvedList.length() > 0 && status == false) {
+ nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
+ }
+ return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies,
+ nonResolvedList, status);
+ }
- static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) throws Exception {
+ static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId)
+ throws Exception {
+ return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null);
+ }
+
+ static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId,
+ CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml.toString());
- ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId);
+ ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies,
+ pushDependencies);
materializeDataProperties(eAction, actionConf, eval);
return XmlUtils.prettyPrint(eAction).toString();
}
@@ -268,6 +273,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
if (jpaService != null) {
try {
if (isChangeInDependency) {
+ coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction);
if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
@@ -281,12 +287,11 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
}
}
- catch (JPAExecutorException jex) {
+ catch (Exception jex) {
throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
}
}
}
-
/**
* This function reads the value of re-queue interval for coordinator input
* check command from the Oozie configuration provided by Configuration
@@ -310,16 +315,26 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
* @return true if all input paths are existed
* @throws Exception thrown of unable to check input path
*/
- protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
+ protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
Configuration conf) throws Exception {
- Element eAction = XmlUtils.parseXml(actionXml.toString());
- return checkResolvedUris(eAction, existList, nonExistList, conf);
+ return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList,
+ nonExistList);
}
- protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
+ /**
+ * Check un resolved input.
+ *
+ * @param coordAction the coord action
+ * @param actionXml the action xml
+ * @param conf the conf
+ * @return true, if successful
+ * @throws Exception the exception
+ */
+ protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml,
+ Configuration conf) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml.toString());
LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
- boolean allExist = checkUnresolvedInstances(eAction, conf);
+ boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf);
if (allExist) {
actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
}
@@ -327,6 +342,18 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
}
/**
+ * Check un resolved input.
+ *
+ * @param actionXml the action xml
+ * @param conf the conf
+ * @return true, if successful
+ * @throws Exception the exception
+ */
+ protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
+ return checkUnResolvedInput(coordAction, actionXml, conf);
+ }
+
+ /**
* Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
* of files that will be needed.
*
@@ -378,222 +405,23 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
* @throws Exception thrown if failed to resolve data input and output paths
*/
@SuppressWarnings("unchecked")
- private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
- String strAction = XmlUtils.prettyPrint(eAction).toString();
- Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
- String actualTimeStr = eAction.getAttributeValue("action-actual-time");
- Date actualTime = null;
- if (actualTimeStr == null) {
- LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
- "from previous version. Assign current date to actual time, action = " + actionId);
- actualTime = new Date();
- } else {
- actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
- }
+ private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction,
+ Configuration actionConf) throws Exception {
- StringBuffer resultedXml = new StringBuffer();
-
- boolean ret;
- Element inputList = eAction.getChild("input-events", eAction.getNamespace());
- if (inputList != null) {
- ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
- actualTime, actionConf);
- if (ret == false) {
- resultedXml.append(strAction);
- return false;
- }
- }
+ boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction);
// Using latest() or future() in output-event is not intuitive.
// We need to make sure, this assumption is correct.
Element outputList = eAction.getChild("output-events", eAction.getNamespace());
if (outputList != null) {
for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
- if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) {
+ if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) {
throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
" not permitted in output-event ");
}
}
}
- return true;
- }
-
- /**
- * Resolve the list of data input paths
- *
- * @param eDataEvents the list of data input elements
- * @param nominalTime action nominal time
- * @param actualTime current time
- * @param conf action configuration
- * @return true if all unresolved URIs can be resolved
- * @throws Exception thrown if failed to resolve data input paths
- */
- @SuppressWarnings("unchecked")
- private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
- Configuration conf) throws Exception {
- for (Element dEvent : eDataEvents) {
- if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) {
- continue;
- }
- ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
- String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim();
- String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
- StringBuffer resolvedTmp = new StringBuffer();
- for (int i = 0; i < unresolvedList.length; i++) {
- String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
- Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
- if (isResolved == false) {
- LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
- return false;
- }
- if (resolvedTmp.length() > 0) {
- resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
- }
- resolvedTmp.append((String) eval.getVariable("resolved_path"));
- }
- if (resolvedTmp.length() > 0) {
- if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
- resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
- dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
- dEvent.removeChild("uris", dEvent.getNamespace());
- }
- Element uriInstance = new Element("uris", dEvent.getNamespace());
- uriInstance.addContent(resolvedTmp.toString());
- dEvent.getContent().add(1, uriInstance);
- }
- dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace());
- }
-
- return true;
- }
-
- /**
- * Check all resolved URIs existence
- *
- * @param eAction action element
- * @param existList the list of existed paths
- * @param nonExistList the list of paths to check existence
- * @param conf action configuration
- * @return true if all nonExistList paths exist
- * @throws IOException thrown if unable to access the path
- */
- private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
- Configuration conf) throws IOException {
- Element inputList = eAction.getChild("input-events", eAction.getNamespace());
- if (inputList != null) {
- if (nonExistList.length() > 0) {
- checkListOfPaths(existList, nonExistList, conf);
- }
- return nonExistList.length() == 0;
- }
- return true;
- }
-
- /**
- * Check a list of non existed paths and add to exist list if it exists
- *
- * @param existList the list of existed paths
- * @param nonExistList the list of paths to check existence
- * @param conf action configuration
- * @return true if all nonExistList paths exist
- * @throws IOException thrown if unable to access the path
- */
- private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
- throws IOException {
-
- String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
- if (uriList[0] != null) {
- LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
- }
-
- nonExistList.delete(0, nonExistList.length());
- boolean allExists = true;
- String existSeparator = "", nonExistSeparator = "";
- String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
- for (int i = 0; i < uriList.length; i++) {
- if (allExists) {
- allExists = pathExists(uriList[i], conf, user);
- LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
- }
- if (allExists) {
- existList.append(existSeparator).append(uriList[i]);
- existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
- }
- else {
- nonExistList.append(nonExistSeparator).append(uriList[i]);
- nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
- }
- }
- return allExists;
- }
-
- /**
- * Check if given path exists
- *
- * @param sPath uri path
- * @param actionConf action configuration
- * @return true if path exists
- * @throws IOException thrown if unable to access the path
- */
- protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException {
- LOG.debug("checking for the file " + sPath);
- try {
- URI uri = new URI(sPath);
- URIHandlerService service = Services.get().get(URIHandlerService.class);
- URIHandler handler = service.getURIHandler(uri);
- return handler.exists(uri, actionConf, user);
- }
- catch (URIHandlerException e) {
- coordAction.setErrorCode(e.getErrorCode().toString());
- coordAction.setErrorMessage(e.getMessage());
- if (e.getCause() != null && e.getCause() instanceof AccessControlException) {
- throw (AccessControlException) e.getCause();
- }
- else {
- throw new IOException(e);
- }
- }
- catch (URISyntaxException e) {
- coordAction.setErrorCode(ErrorCode.E0906.toString());
- coordAction.setErrorMessage(e.getMessage());
- throw new IOException(e);
- }
- }
-
- /**
- * The function create a list of URIs separated by "," using the instances time stamp and URI-template
- *
- * @param event : <data-in> event
- * @param instances : List of time stamp seprated by ","
- * @param unresolvedInstances : list of instance with latest/future function
- * @return : list of URIs separated by ",".
- * @throws Exception thrown if failed to create URIs from unresolvedInstances
- */
- @SuppressWarnings("unused")
- private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
- if (instances == null || instances.length() == 0) {
- return "";
- }
- String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
- StringBuilder uris = new StringBuilder();
-
- for (int i = 0; i < instanceList.length; i++) {
- int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
- if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
- if (unresolvedInstances.length() > 0) {
- unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
- }
- unresolvedInstances.append(instanceList[i]);
- continue;
- }
- ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
- if (uris.length() > 0) {
- uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
- }
- uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
- "uri-template", event.getNamespace()).getTextTrim()));
- }
- return uris.toString();
+ return ret;
}
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
index 4e1c5b3..cb866e2 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
@@ -18,11 +18,9 @@
package org.apache.oozie.command.coord;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
@@ -35,9 +33,11 @@ public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyC
@Override
protected Void execute() throws CommandException {
+ CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
+ CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
+
LOG.info("STARTED for Action id [{0}]", actionId);
- String pushMissingDeps = coordAction.getPushMissingDependencies();
- if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+ if (coordPushInputDependency.isDependencyMet()) {
LOG.info("Nothing to check. Empty push missing dependency");
}
else {
@@ -50,25 +50,19 @@ public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyC
}
}
else {
- LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(),
- pushMissingDeps);
-
- String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
- List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
- stillMissingDepsList.removeAll(availDepList);
+ String pushMissingDependencies = coordPushInputDependency.getMissingDependencies().toString();
+ LOG.debug("Updating with available uris = [{0}] where missing uris = [{1}]", pushMissingDependencies);
+ String[] missingDependenciesArray = DependencyChecker.dependenciesAsArray(pushMissingDependencies);
+ coordPushInputDependency.addToAvailableDependencies(availDepList);
boolean isChangeInDependency = true;
- if (stillMissingDepsList.size() == 0) {
+ if (coordPushInputDependency.isDependencyMet()) {
// All push-based dependencies are available
- onAllPushDependenciesAvailable();
+ onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet());
}
else {
- if (stillMissingDepsList.size() == missingDepsArray.length) {
+ if (coordPushInputDependency.getMissingDependenciesAsList().size() == missingDependenciesArray.length) {
isChangeInDependency = false;
}
- else {
- String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList);
- coordAction.setPushMissingDependencies(stillMissingDeps);
- }
if (isTimeout()) { // Poll and check as one last try
queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
index 58ef483..0af7edc 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
@@ -18,9 +18,12 @@
package org.apache.oozie.command.coord;
+import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
+import java.net.URISyntaxException;
import java.text.ParseException;
+import java.util.ArrayList;
import java.util.TimeZone;
import java.util.Map;
import java.util.HashMap;
@@ -32,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
@@ -39,17 +43,25 @@ import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.SyncCoordAction;
import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
+import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory;
+import org.apache.oozie.coord.input.dependency.CoordInputInstance;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.dependency.URIHandler.DependencyType;
+import org.apache.oozie.dependency.URIHandlerException;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
+import org.jdom.Attribute;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.quartz.CronExpression;
@@ -63,8 +75,9 @@ public class CoordCommandUtils {
public static int OFFSET = 3;
public static int ABSOLUTE = 4;
public static int UNEXPECTED = -1;
+
public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!";
- public static final String UNRESOLVED_INST_TAG = "unresolved-instances";
+ public static final String UNRESOLVED_INSTANCES_TAG = "unresolved-instances";
/**
* parse a function like coord:latest(n)/future() and return the 'n'.
@@ -357,7 +370,7 @@ public class CoordCommandUtils {
depList.append(urisWithDoneFlag);
}
if (unresolvedInstances.length() > 0) {
- Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace());
+ Element elemInstance = new Element(UNRESOLVED_INSTANCES_TAG, event.getNamespace());
elemInstance.addContent(unresolvedInstances.toString());
event.getContent().add(1, elemInstance);
}
@@ -482,20 +495,24 @@ public class CoordCommandUtils {
appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
- Map<String, StringBuilder> dependencyMap = null;
+ boolean isInputLogicSpecified = CoordUtils.isInputLogicSpecified(eAction);
Element inputList = eAction.getChild("input-events", eAction.getNamespace());
List<Element> dataInList = null;
if (inputList != null) {
dataInList = inputList.getChildren("data-in", eAction.getNamespace());
- dependencyMap = materializeDataEvents(dataInList, appInst, conf);
+ materializeInputDataEvents(dataInList, appInst, conf, actionBean, isInputLogicSpecified);
}
+ if(isInputLogicSpecified){
+ evaluateInputCheck(eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()),
+ CoordELEvaluator.createDataEvaluator(eAction, conf, actionId));
+ }
Element outputList = eAction.getChild("output-events", eAction.getNamespace());
List<Element> dataOutList = null;
if (outputList != null) {
dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
- materializeDataEvents(dataOutList, appInst, conf);
+ materializeOutputDataEvents(dataOutList, appInst, conf);
}
eAction.removeAttribute("start");
@@ -513,16 +530,6 @@ public class CoordCommandUtils {
actionBean.setLastModifiedTime(new Date());
actionBean.setStatus(CoordinatorAction.Status.WAITING);
actionBean.setActionNumber(instanceCount);
- if (dependencyMap != null) {
- StringBuilder sbPull = dependencyMap.get(DependencyType.PULL.name());
- if (sbPull != null) {
- actionBean.setMissingDependencies(sbPull.toString());
- }
- StringBuilder sbPush = dependencyMap.get(DependencyType.PUSH.name());
- if (sbPush != null) {
- actionBean.setPushMissingDependencies(sbPush.toString());
- }
- }
actionBean.setNominalTime(nominalTime);
boolean isSla = CoordCommandUtils.materializeSLA(eAction, actionBean, conf);
if (isSla == true) {
@@ -544,6 +551,7 @@ public class CoordCommandUtils {
}
}
+
/**
* @param eAction the actionXml related element
* @param actionBean the coordinator action bean
@@ -554,12 +562,18 @@ public class CoordCommandUtils {
String action = XmlUtils.prettyPrint(eAction).toString();
StringBuilder actionXml = new StringBuilder(action);
Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ actionBean.setActionXml(action);
+
+ if (CoordUtils.isInputLogicSpecified(eAction)) {
+ new CoordInputLogicEvaluatorUtil(actionBean).validateInputLogic();
+ }
boolean isPushDepAvailable = true;
- if (actionBean.getPushMissingDependencies() != null) {
- ActionDependency actionDep = DependencyChecker.checkForAvailability(
- actionBean.getPushMissingDependencies(), actionConf, true);
- if (actionDep.getMissingDependencies().size() != 0) {
+ String pushMissingDependencies = actionBean.getPushInputDependencies().getMissingDependencies();
+ if (pushMissingDependencies != null) {
+ ActionDependency actionDependencies = DependencyChecker.checkForAvailability(pushMissingDependencies,
+ actionConf, true);
+ if (actionDependencies.getMissingDependencies().size() != 0) {
isPushDepAvailable = false;
}
@@ -571,13 +585,16 @@ public class CoordCommandUtils {
StringBuilder existList = new StringBuilder();
StringBuilder nonExistList = new StringBuilder();
StringBuilder nonResolvedList = new StringBuilder();
- getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
- isPullDepAvailable = coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+ getResolvedList(actionBean.getPullInputDependencies().getMissingDependencies(), nonExistList, nonResolvedList);
+ isPullDepAvailable = actionBean.getPullInputDependencies().checkPullMissingDependencies(actionBean,
+ existList, nonExistList);
+
}
if (isPullDepAvailable && isPushDepAvailable) {
// Check for latest/future
- boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionXml, actionConf);
+ boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionBean, actionXml,
+ actionConf);
if (isLatestFutureDepAvailable) {
String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
actionBean.getId());
@@ -598,17 +615,68 @@ public class CoordCommandUtils {
* @param conf
* @throws Exception
*/
- public static Map<String, StringBuilder> materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf
- ) throws Exception {
+ private static void materializeOutputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf)
+ throws Exception {
if (events == null) {
- return null;
+ return;
+ }
+
+ for (Element event : events) {
+ StringBuilder instances = new StringBuilder();
+ ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
+ // Handle list of instance tag
+ resolveInstances(event, instances, appInst, conf, eval);
+ // Handle start-instance and end-instance
+ resolveInstanceRange(event, instances, appInst, conf, eval);
+ // Separate out the unresolved instances
+ separateResolvedAndUnresolved(event, instances);
+
+ }
+ }
+
+ private static void evaluateInputCheck(Element root, ELEvaluator evalInputLogic) throws Exception {
+ for (Object event : root.getChildren()) {
+ Element inputElement = (Element) event;
+
+ resolveAttribute("dataset", inputElement, evalInputLogic);
+ resolveAttribute("name", inputElement, evalInputLogic);
+ resolveAttribute("min", inputElement, evalInputLogic);
+ resolveAttribute("wait", inputElement, evalInputLogic);
+ if (!inputElement.getChildren().isEmpty()) {
+ evaluateInputCheck(inputElement, evalInputLogic);
+ }
}
- StringBuilder unresolvedList = new StringBuilder();
- Map<String, StringBuilder> dependencyMap = new HashMap<String, StringBuilder>();
+ }
+
+ private static String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
+ Attribute attr = elem.getAttribute(attrName);
+ String val = null;
+ if (attr != null) {
+ try {
+ val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
+ }
+ catch (Exception e) {
+ throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
+ }
+ attr.setValue(val);
+ }
+ return val;
+ }
+
+ public static void materializeInputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
+ CoordinatorActionBean actionBean, boolean isInputLogicSpecified) throws Exception {
+
+ if (events == null) {
+ return;
+ }
+ CoordInputDependency coordPullInputDependency = CoordInputDependencyFactory
+ .createPullInputDependencies(isInputLogicSpecified);
+ CoordInputDependency coordPushInputDependency = CoordInputDependencyFactory
+ .createPushInputDependencies(isInputLogicSpecified);
+ Map<String, String> unresolvedList = new HashMap<String, String>();
+
URIHandlerService uriService = Services.get().get(URIHandlerService.class);
- StringBuilder pullMissingDep = null;
- StringBuilder pushMissingDep = null;
for (Element event : events) {
StringBuilder instances = new StringBuilder();
@@ -619,41 +687,44 @@ public class CoordCommandUtils {
resolveInstanceRange(event, instances, appInst, conf, eval);
// Separate out the unresolved instances
String resolvedList = separateResolvedAndUnresolved(event, instances);
+ String name = event.getAttribute("name").getValue();
+
if (!resolvedList.isEmpty()) {
Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template",
event.getNamespace());
+
String uriTemplate = uri.getText();
URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
URIHandler handler = uriService.getURIHandler(baseURI);
+ List<CoordInputInstance> inputInstanceList = new ArrayList<CoordInputInstance>();
+
+ for (String inputInstance : resolvedList.split("#")) {
+ inputInstanceList.add(new CoordInputInstance(inputInstance, false));
+ }
+
if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) {
- pullMissingDep = (pullMissingDep == null) ? new StringBuilder(resolvedList) : pullMissingDep.append(
- CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+ coordPullInputDependency.addInputInstanceList(name, inputInstanceList);
}
else {
- pushMissingDep = (pushMissingDep == null) ? new StringBuilder(resolvedList) : pushMissingDep.append(
- CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+ coordPushInputDependency.addInputInstanceList(name, inputInstanceList);
+
}
}
- String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace());
+ String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INSTANCES_TAG, event.getNamespace());
if (tmpUnresolved != null) {
- if (unresolvedList.length() > 0) {
- unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
- }
- unresolvedList.append(tmpUnresolved);
+ unresolvedList.put(name, tmpUnresolved);
}
}
- if (unresolvedList.length() > 0) {
- if (pullMissingDep == null) {
- pullMissingDep = new StringBuilder();
- }
- pullMissingDep.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList);
+ for(String unresolvedDatasetName:unresolvedList.keySet()){
+ coordPullInputDependency.addUnResolvedList(unresolvedDatasetName, unresolvedList.get(unresolvedDatasetName));
}
- dependencyMap.put(DependencyType.PULL.name(), pullMissingDep);
- dependencyMap.put(DependencyType.PUSH.name(), pushMissingDep);
- return dependencyMap;
- }
+ actionBean.setPullInputDependencies(coordPullInputDependency);
+ actionBean.setPushInputDependencies(coordPushInputDependency);
+ actionBean.setMissingDependencies(coordPullInputDependency.serialize());
+ actionBean.setPushMissingDependencies(coordPushInputDependency.serialize());
+ }
/**
* Get resolved string from missDepList
*
@@ -797,4 +868,19 @@ public class CoordCommandUtils {
}
return nextNominalTime;
}
+
+ public static boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException,
+ URISyntaxException, URIHandlerException {
+ URI uri = new URI(sPath);
+ URIHandlerService service = Services.get().get(URIHandlerService.class);
+ URIHandler handler = service.getURIHandler(uri);
+ return handler.exists(uri, actionConf, user);
+ }
+
+ public static boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException,
+ URIHandlerException {
+ String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
+ return pathExists(sPath, actionConf, user);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 39e6ac1..f6c1782 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -18,6 +18,7 @@
package org.apache.oozie.command.coord;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
@@ -34,6 +35,7 @@ import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
@@ -148,7 +150,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
Math.max(checkDelay, 0));
- if (coordAction.getPushMissingDependencies() != null) {
+ if (!StringUtils.isEmpty(coordAction.getPushMissingDependencies())) {
// TODO: Delay in catchup mode?
queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
}
@@ -485,7 +487,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
nextTime, actualTime, lastActionNumber, jobConf, actionBean);
actionBean.setTimeOut(timeout);
-
if (!dryrun) {
storeToDB(actionBean, action, jobConf); // Storing to table
@@ -529,7 +530,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
+ actionXml.length());
actionBean.setActionXml(actionXml);
-
insertList.add(actionBean);
writeActionSlaRegistration(actionXml, actionBean, jobConf);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
index b05344d..2600a2b 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
@@ -21,10 +21,10 @@ package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
-import java.util.Arrays;
import java.util.Date;
import java.util.List;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
@@ -34,7 +34,7 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
@@ -113,14 +113,15 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
return null;
}
- String pushMissingDeps = coordAction.getPushMissingDependencies();
- if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+ CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
+ CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
+ if (coordPushInputDependency.getMissingDependenciesAsList().size() == 0) {
LOG.info("Nothing to check. Empty push missing dependency");
}
else {
- String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
- LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
- LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
+ List<String> missingDependenciesArray = coordPushInputDependency.getMissingDependenciesAsList();
+ LOG.info("First Push missing dependency is [{0}] ", missingDependenciesArray.get(0));
+ LOG.trace("Push missing dependencies are [{0}] ", missingDependenciesArray);
if (registerForNotification) {
LOG.debug("Register for notifications is true");
}
@@ -134,27 +135,27 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
}
+
+ boolean isChangeInDependency = true;
+ boolean timeout = false;
+ ActionDependency actionDependency = coordPushInputDependency.checkPushMissingDependencies(coordAction,
+ registerForNotification);
// Check all dependencies during materialization to avoid registering in the cache.
// But check only first missing one afterwards similar to
// CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
- ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
- !registerForNotification);
+ if (actionDependency.getMissingDependencies().size() == missingDependenciesArray.size()) {
+ isChangeInDependency = false;
+ }
+ else {
+ coordPushInputDependency.setMissingDependencies(StringUtils.join(
+ actionDependency.getMissingDependencies(), CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR));
+ }
- boolean isChangeInDependency = true;
- boolean timeout = false;
- if (actionDep.getMissingDependencies().size() == 0) {
+ if (coordPushInputDependency.isDependencyMet()) {
// All push-based dependencies are available
- onAllPushDependenciesAvailable();
+ onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet());
}
else {
- if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
- isChangeInDependency = false;
- }
- else {
- String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
- .getMissingDependencies());
- coordAction.setPushMissingDependencies(stillMissingDeps);
- }
// Checking for timeout
timeout = isTimeout();
if (timeout) {
@@ -166,15 +167,15 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
}
}
- updateCoordAction(coordAction, isChangeInDependency);
+ updateCoordAction(coordAction, isChangeInDependency || coordPushInputDependency.isDependencyMet());
if (registerForNotification) {
- registerForNotification(actionDep.getMissingDependencies(), actionConf);
+ registerForNotification(coordPushInputDependency.getMissingDependenciesAsList(), actionConf);
}
if (removeAvailDependencies) {
- unregisterAvailableDependencies(actionDep.getAvailableDependencies());
+ unregisterAvailableDependencies(actionDependency.getAvailableDependencies());
}
if (timeout) {
- unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
+ unregisterMissingDependencies(coordPushInputDependency.getMissingDependenciesAsList(), actionId);
}
}
catch (Exception e) {
@@ -183,10 +184,9 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
LOG.debug("Queueing timeout command");
// XCommand.queue() will not work when there is a Exception
callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
- unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
+ unregisterMissingDependencies(missingDependenciesArray, actionId);
}
- else if (coordAction.getMissingDependencies() != null
- && coordAction.getMissingDependencies().length() > 0) {
+ else if (coordPullInputDependency.getMissingDependenciesAsList().size() > 0) {
// Queue again on exception as RecoveryService will not queue this again with
// the action being updated regularly by CoordActionInputCheckXCommand
callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
@@ -221,18 +221,18 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
return (timeOut >= 0) && (waitingTime > timeOut);
}
- protected void onAllPushDependenciesAvailable() throws CommandException {
- coordAction.setPushMissingDependencies("");
+ protected void onAllPushDependenciesAvailable(boolean isPullDependencyMeet) throws CommandException {
Services.get().get(PartitionDependencyManagerService.class)
.removeCoordActionWithDependenciesAvailable(coordAction.getId());
- if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
+ if (isPullDependencyMeet) {
Date nominalTime = coordAction.getNominalTime();
Date currentTime = new Date();
// The action should become READY only if current time > nominal time;
// CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
if (nominalTime.compareTo(currentTime) > 0) {
LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
- + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
+ + DateUtils.formatDateOozieTZ(currentTime) + ", nominal="
+ + DateUtils.formatDateOozieTZ(nominalTime));
}
else {
String actionXml = resolveCoordConfiguration();
@@ -248,6 +248,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
// wait till RecoveryService kicks in
queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
}
+ coordAction.getPushInputDependencies().setDependencyMet(true);
+
}
private String resolveCoordConfiguration() throws CommandException {
@@ -255,7 +257,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
- actionId);
+ actionId, coordAction.getPullInputDependencies(), coordAction
+ .getPushInputDependencies());
actionXml.replace(0, actionXml.length(), newActionXml);
return actionXml.toString();
}
@@ -270,6 +273,7 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
if (jpaService != null) {
try {
if (isChangeInDependency) {
+ coordAction.setPushMissingDependencies(coordAction.getPushInputDependencies().serialize());
CoordActionQueryExecutor.getInstance().executeUpdate(
CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction);
if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
@@ -286,6 +290,9 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
catch (JPAExecutorException jex) {
throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
}
+ catch (IOException ioe) {
+ throw new CommandException(ErrorCode.E1021, ioe.getMessage(), ioe);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index d4d1c08..f1f9ab2 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -53,8 +53,10 @@ import org.apache.oozie.command.SubmitTransitionXCommand;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.CoordUtils;
import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CoordMaterializeTriggerService;
@@ -799,6 +801,11 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
resolveIODataset(eAppXml);
resolveIOEvents(eAppXml, dataNameList);
+ if (CoordUtils.isInputLogicSpecified(eAppXml)) {
+ resolveInputLogic(eAppXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAppXml.getNamespace()), evalInst,
+ dataNameList);
+ }
+
resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
eAppXml.getNamespace()), evalNofuncs);
// TODO: If action or workflow tag is missing, NullPointerException will
@@ -896,6 +903,26 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
}
+ private void resolveInputLogic(Element root, ELEvaluator evalInputLogic, HashMap<String, String> dataNameList)
+ throws Exception {
+ for (Object event : root.getChildren()) {
+ Element inputElement = (Element) event;
+ resolveAttribute("dataset", inputElement, evalInputLogic);
+ String name=resolveAttribute("name", inputElement, evalInputLogic);
+ resolveAttribute("or", inputElement, evalInputLogic);
+ resolveAttribute("and", inputElement, evalInputLogic);
+ resolveAttribute("combine", inputElement, evalInputLogic);
+
+ if (name != null) {
+ dataNameList.put(name, "data-in");
+ }
+
+ if (!inputElement.getChildren().isEmpty()) {
+ resolveInputLogic(inputElement, evalInputLogic, dataNameList);
+ }
+ }
+ }
+
/**
* Resolve input-events/dataset and output-events/dataset tags.
*
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java b/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
index f010a81..eabf473 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
@@ -33,4 +33,7 @@ public class CoordELConstants {
public static final int SUBMIT_DAYS = 24 * 60;
public static final String DEFAULT_DONE_FLAG = "_SUCCESS";
+ final public static String RESOLVED_PATH = "resolved_path";
+
+ final public static String IS_RESOLVED = "is_resolved";
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java b/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
index 8b2f456..fba8ac1 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
@@ -28,6 +28,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
@@ -141,7 +143,7 @@ public class CoordELEvaluator {
uris = uris.replaceAll(CoordELFunctions.INSTANCE_SEPARATOR, CoordELFunctions.DIR_SEPARATOR);
eval.setVariable(".dataout." + data.getAttributeValue("name"), uris);
}
- if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
+ if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
eval.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true");
}
}
@@ -172,7 +174,13 @@ public class CoordELEvaluator {
* @return configured ELEvaluator
* @throws Exception : If there is any date-time string in wrong format, the exception is thrown
*/
+
public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId) throws Exception {
+ return createDataEvaluator(eJob, conf, actionId, null, null);
+ }
+
+ public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId,
+ CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
ELEvaluator e = Services.get().get(ELService.class).createEvaluator("coord-action-start");
setConfigToEval(e, conf);
SyncCoordAction appInst = new SyncCoordAction();
@@ -184,6 +192,12 @@ public class CoordELEvaluator {
appInst.setTimeUnit(TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")));
appInst.setActionId(actionId);
appInst.setName(eJob.getAttributeValue("name"));
+ appInst.setPullDependencies(pullDependencies);
+ appInst.setPushDependencies(pushDependencies);
+ if (CoordUtils.isInputLogicSpecified(eJob)) {
+ e.setVariable(".actionInputLogic",
+ XmlUtils.prettyPrint(eJob.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eJob.getNamespace())).toString());
+ }
}
String strActualTime = eJob.getAttributeValue("action-actual-time");
if (strActualTime != null) {
@@ -200,11 +214,14 @@ public class CoordELEvaluator {
}
else {
}
- if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
+ if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
e.setVariable(".datain." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
// check
// null
}
+ Element doneFlagElement = data.getChild("done-flag", data.getNamespace());
+ String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
+ e.setVariable(".datain." + data.getAttributeValue("name") + ".doneFlag", doneFlag);
}
}
events = eJob.getChild("output-events", eJob.getNamespace());
@@ -217,7 +234,7 @@ public class CoordELEvaluator {
}
else {
}// TODO
- if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
+ if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
e.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
// check
// null