You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 21:46:37 UTC

[5/5] oozie git commit: OOZIE-1976 Specifying coordinator input datasets in more logical ways

OOZIE-1976 Specifying coordinator input datasets in more logical ways


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/81ce22b6
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/81ce22b6
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/81ce22b6

Branch: refs/heads/master
Commit: 81ce22b6f23b2bba49df4733961ee82b58c38d0d
Parents: 5abd3e6
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Tue Jan 26 12:46:16 2016 -0800
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Tue Jan 26 12:46:16 2016 -0800

----------------------------------------------------------------------
 .../main/resources/oozie-coordinator-0.5.xsd    |  194 ++++
 core/pom.xml                                    |    5 +
 .../org/apache/oozie/CoordinatorActionBean.java |   46 +-
 .../main/java/org/apache/oozie/ErrorCode.java   |    2 +
 .../coord/CoordActionInputCheckXCommand.java    |  318 ++----
 .../CoordActionUpdatePushMissingDependency.java |   30 +-
 .../oozie/command/coord/CoordCommandUtils.java  |  180 ++-
 .../CoordMaterializeTransitionXCommand.java     |    6 +-
 .../coord/CoordPushDependencyCheckXCommand.java |   73 +-
 .../command/coord/CoordSubmitXCommand.java      |   27 +
 .../apache/oozie/coord/CoordELConstants.java    |    3 +
 .../apache/oozie/coord/CoordELEvaluator.java    |   23 +-
 .../apache/oozie/coord/CoordELFunctions.java    |   49 +-
 .../java/org/apache/oozie/coord/CoordUtils.java |   22 +
 .../org/apache/oozie/coord/SyncCoordAction.java |   22 +
 .../AbstractCoordInputDependency.java           |  315 ++++++
 .../input/dependency/CoordInputDependency.java  |  172 +++
 .../dependency/CoordInputDependencyFactory.java |  170 +++
 .../input/dependency/CoordInputInstance.java    |   83 ++
 .../dependency/CoordOldInputDependency.java     |  309 +++++
 .../dependency/CoordPullInputDependency.java    |  151 +++
 .../dependency/CoordPushInputDependency.java    |   49 +
 .../CoordUnResolvedInputDependency.java         |   92 ++
 .../input/logic/CoordInputLogicBuilder.java     |  167 +++
 .../input/logic/CoordInputLogicEvaluator.java   |   44 +
 .../logic/CoordInputLogicEvaluatorPhaseOne.java |  324 ++++++
 .../CoordInputLogicEvaluatorPhaseThree.java     |  130 +++
 .../logic/CoordInputLogicEvaluatorPhaseTwo.java |  144 +++
 .../CoordInputLogicEvaluatorPhaseValidate.java  |   89 ++
 .../logic/CoordInputLogicEvaluatorResult.java   |  104 ++
 .../logic/CoordInputLogicEvaluatorUtil.java     |  229 ++++
 .../coord/input/logic/InputLogicParser.java     |  309 +++++
 .../coord/input/logic/OozieJexlEngine.java      |   47 +
 .../coord/input/logic/OozieJexlInterpreter.java |   73 ++
 .../oozie/dependency/ActionDependency.java      |    2 +-
 .../oozie/dependency/DependencyChecker.java     |   15 +-
 .../apache/oozie/dependency/FSURIHandler.java   |    9 +
 .../apache/oozie/dependency/HCatURIHandler.java |    5 +
 .../org/apache/oozie/dependency/URIHandler.java |   14 +
 .../org/apache/oozie/util/WritableUtils.java    |  148 ++-
 core/src/main/resources/oozie-default.xml       |    2 +-
 .../TestCoordActionInputCheckXCommand.java      |    9 +-
 .../input/logic/TestCoordInputLogicPush.java    |  645 +++++++++++
 .../input/logic/TestCoordinatorInputLogic.java  | 1054 ++++++++++++++++++
 .../coord/input/logic/TestInputLogicParser.java |  367 ++++++
 core/src/test/resources/coord-action-sla.xml    |    2 +-
 .../test/resources/coord-inputlogic-combine.xml |  119 ++
 .../test/resources/coord-inputlogic-hcat.xml    |  119 ++
 .../test/resources/coord-inputlogic-latest.xml  |  124 +++
 .../resources/coord-inputlogic-range-latest.xml |  130 +++
 .../test/resources/coord-inputlogic-range.xml   |  107 ++
 core/src/test/resources/coord-inputlogic.xml    |  126 +++
 pom.xml                                         |    7 +
 release-log.txt                                 |    1 +
 54 files changed, 6625 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/client/src/main/resources/oozie-coordinator-0.5.xsd
----------------------------------------------------------------------
diff --git a/client/src/main/resources/oozie-coordinator-0.5.xsd b/client/src/main/resources/oozie-coordinator-0.5.xsd
new file mode 100644
index 0000000..2b63629
--- /dev/null
+++ b/client/src/main/resources/oozie-coordinator-0.5.xsd
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:coordinator="uri:oozie:coordinator:0.5"
+           elementFormDefault="qualified" targetNamespace="uri:oozie:coordinator:0.5">
+
+    <xs:element name="coordinator-app" type="coordinator:COORDINATOR-APP"/>
+    <xs:element name="datasets" type="coordinator:DATASETS"/>
+    <xs:simpleType name="IDENTIFIER">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="([a-zA-Z]([\-_a-zA-Z0-9])*){1,39}"/>
+        </xs:restriction>
+    </xs:simpleType>
+    <xs:complexType name="COORDINATOR-APP">
+        <xs:sequence>
+            <xs:element name="parameters" type="coordinator:PARAMETERS" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="controls" type="coordinator:CONTROLS" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="datasets" type="coordinator:DATASETS" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="input-events" type="coordinator:INPUTEVENTS" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="input-logic" type="coordinator:INPUTLOGIC" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="output-events" type="coordinator:OUTPUTEVENTS" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="action" type="coordinator:ACTION" minOccurs="1" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute name="name" type="xs:string" use="required"/>
+        <xs:attribute name="frequency" type="xs:string" use="required"/>
+        <xs:attribute name="start" type="xs:string" use="required"/>
+        <xs:attribute name="end" type="xs:string" use="required"/>
+        <xs:attribute name="timezone" type="xs:string" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="PARAMETERS">
+        <xs:sequence>
+            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+                <xs:complexType>
+                    <xs:sequence>
+                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="value" minOccurs="0" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+                    </xs:sequence>
+                </xs:complexType>
+            </xs:element>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="CONTROLS">
+        <xs:sequence minOccurs="0" maxOccurs="1">
+            <xs:element name="timeout" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="concurrency" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="execution" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="throttle" type="xs:string" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="DATASETS">
+        <xs:sequence minOccurs="0" maxOccurs="1">
+            <xs:element name="include" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:choice minOccurs="0" maxOccurs="unbounded">
+                <xs:element name="dataset" type="coordinator:SYNCDATASET" minOccurs="0" maxOccurs="1"/>
+                <xs:element name="async-dataset" type="coordinator:ASYNCDATASET" minOccurs="0" maxOccurs="1"/>
+            </xs:choice>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="SYNCDATASET">
+        <xs:sequence>
+            <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="done-flag" type="xs:string" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+        <xs:attribute name="frequency" type="xs:string" use="required"/>
+        <xs:attribute name="initial-instance" type="xs:string" use="required"/>
+        <xs:attribute name="timezone" type="xs:string" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="ASYNCDATASET">
+        <xs:sequence>
+            <xs:element name="uri-template" type="xs:string" minOccurs="1" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+        <xs:attribute name="sequence-type" type="xs:string" use="required"/>
+        <xs:attribute name="initial-version" type="xs:string" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="INPUTEVENTS">
+        <xs:choice minOccurs="1" maxOccurs="1">
+            <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="data-in" type="coordinator:DATAIN" minOccurs="1" maxOccurs="unbounded"/>
+        </xs:choice>
+    </xs:complexType>
+    <xs:complexType name="INPUTLOGIC">
+        <xs:choice minOccurs="0" maxOccurs="unbounded">
+            <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/>
+        </xs:choice>
+    </xs:complexType>
+    <xs:complexType name="LOGICALAND">
+        <xs:choice minOccurs="0" maxOccurs="unbounded">
+            <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/>
+            <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:choice>
+        <xs:attribute name="name" type="xs:string" use="optional"/>
+        <xs:attribute name="min" type="xs:string" use="optional"/>
+        <xs:attribute name="wait" type="xs:string" use="optional"/>
+    </xs:complexType>
+    <xs:complexType name="LOGICALOR">
+        <xs:choice minOccurs="0" maxOccurs="unbounded">
+            <xs:element name="and" type="coordinator:LOGICALAND" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="or" type="coordinator:LOGICALOR" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="1" maxOccurs="unbounded"/>
+            <xs:element name="combine" type="coordinator:COMBINE" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:choice>
+        <xs:attribute name="name" type="xs:string" use="optional"/>
+        <xs:attribute name="min" type="xs:string" use="optional"/>
+        <xs:attribute name="wait" type="xs:string" use="optional"/>
+    </xs:complexType>
+    <xs:complexType name="COMBINE">
+        <xs:choice minOccurs="0" maxOccurs="unbounded">
+            <xs:element name="data-in" type="coordinator:LOGICALDATAIN" minOccurs="2" maxOccurs="unbounded"/>
+        </xs:choice>
+        <xs:attribute name="name" type="xs:string" use="optional"/>
+        <xs:attribute name="min" type="xs:string" use="optional"/>
+        <xs:attribute name="wait" type="xs:string" use="optional"/>
+    </xs:complexType>
+    <xs:complexType name="LOGICALDATAIN">
+        <xs:attribute name="name" type="xs:string" use="optional"/>
+        <xs:attribute name="min" type="xs:string" use="optional"/>
+        <xs:attribute name="wait" type="xs:string" use="optional"/>
+        <xs:attribute name="dataset" type="xs:string" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="DATAIN">
+        <xs:choice minOccurs="1" maxOccurs="1">
+            <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="unbounded"/>
+            <xs:sequence minOccurs="1" maxOccurs="1">
+                <xs:element name="start-instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
+                <xs:element name="end-instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            </xs:sequence>
+        </xs:choice>
+        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+        <xs:attribute name="dataset" type="xs:string" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="OUTPUTEVENTS">
+        <xs:sequence minOccurs="1" maxOccurs="1">
+            <xs:element name="data-out" type="coordinator:DATAOUT" minOccurs="1" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="DATAOUT">
+        <xs:sequence minOccurs="1" maxOccurs="1">
+            <xs:element name="instance" type="xs:string" minOccurs="1" maxOccurs="1"/>
+        </xs:sequence>
+        <xs:attribute name="name" type="coordinator:IDENTIFIER" use="required"/>
+        <xs:attribute name="dataset" type="xs:string" use="required"/>
+    </xs:complexType>
+    <xs:complexType name="ACTION">
+        <xs:sequence minOccurs="1" maxOccurs="1">
+            <xs:element name="workflow" type="coordinator:WORKFLOW" minOccurs="1" maxOccurs="1"/>
+            <xs:any namespace="uri:oozie:sla:0.1 uri:oozie:sla:0.2" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+    </xs:complexType>
+    <xs:complexType name="WORKFLOW">
+        <xs:sequence>
+            <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="configuration" type="coordinator:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="FLAG"/>
+    <xs:complexType name="CONFIGURATION">
+        <xs:sequence>
+            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+                <xs:complexType>
+                    <xs:sequence>
+                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+                    </xs:sequence>
+                </xs:complexType>
+            </xs:element>
+        </xs:sequence>
+    </xs:complexType>
+</xs:schema>

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index b063dab..b72ea7d 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -282,6 +282,11 @@
             <version>3.4</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-jexl</artifactId>
+            <scope>compile</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.oozie</groupId>

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 91bff4d..b1be7c9 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -34,12 +34,15 @@ import javax.persistence.Lob;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
 import javax.persistence.Table;
+import javax.persistence.Transient;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.JsonTags;
 import org.apache.oozie.client.rest.JsonUtils;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
+import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.WritableUtils;
 import org.apache.openjpa.persistence.jdbc.Index;
@@ -285,6 +288,13 @@ public class CoordinatorActionBean implements
         return toJSONObject("GMT");
     }
 
+    @Transient
+    private CoordInputDependency coordPushInputDependency;
+
+    @Transient
+    private CoordInputDependency coordPullInputDependency;
+
+
     public CoordinatorActionBean() {
     }
 
@@ -745,23 +755,21 @@ public class CoordinatorActionBean implements
         json.put(JsonTags.COORDINATOR_ACTION_TYPE, type);
         json.put(JsonTags.COORDINATOR_ACTION_NUMBER, actionNumber);
         json.put(JsonTags.COORDINATOR_ACTION_CREATED_CONF, getCreatedConf());
-        json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils
-                .formatDateRfc822(getCreatedTime(), timeZoneId));
-        json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils
-                .formatDateRfc822(getNominalTime(), timeZoneId));
+        json.put(JsonTags.COORDINATOR_ACTION_CREATED_TIME, JsonUtils.formatDateRfc822(getCreatedTime(), timeZoneId));
+        json.put(JsonTags.COORDINATOR_ACTION_NOMINAL_TIME, JsonUtils.formatDateRfc822(getNominalTime(), timeZoneId));
         json.put(JsonTags.COORDINATOR_ACTION_EXTERNALID, externalId);
         // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
         // .formatDateRfc822(startTime), timeZoneId);
         json.put(JsonTags.COORDINATOR_ACTION_STATUS, statusStr);
         json.put(JsonTags.COORDINATOR_ACTION_RUNTIME_CONF, getRunConf());
-        json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME, JsonUtils
-                .formatDateRfc822(getLastModifiedTime(), timeZoneId));
+        json.put(JsonTags.COORDINATOR_ACTION_LAST_MODIFIED_TIME,
+                JsonUtils.formatDateRfc822(getLastModifiedTime(), timeZoneId));
         // json.put(JsonTags.COORDINATOR_ACTION_START_TIME, JsonUtils
         // .formatDateRfc822(startTime), timeZoneId);
         // json.put(JsonTags.COORDINATOR_ACTION_END_TIME, JsonUtils
         // .formatDateRfc822(endTime), timeZoneId);
-        json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getMissingDependencies());
-        json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushMissingDependencies());
+        json.put(JsonTags.COORDINATOR_ACTION_MISSING_DEPS, getPullInputDependencies().getMissingDependencies());
+        json.put(JsonTags.COORDINATOR_ACTION_PUSH_MISSING_DEPS, getPushInputDependencies().getMissingDependencies());
         json.put(JsonTags.COORDINATOR_ACTION_EXTERNAL_STATUS, externalStatus);
         json.put(JsonTags.COORDINATOR_ACTION_TRACKER_URI, trackerUri);
         json.put(JsonTags.COORDINATOR_ACTION_CONSOLE_URL, consoleUrl);
@@ -818,5 +826,27 @@ public class CoordinatorActionBean implements
         return true;
     }
 
+    public CoordInputDependency getPullInputDependencies() {
+        if (coordPullInputDependency == null) {
+            coordPullInputDependency = CoordInputDependencyFactory.getPullInputDependencies(missingDependencies);
+        }
+        return coordPullInputDependency;
+
+    }
+
+    public CoordInputDependency getPushInputDependencies() {
+        if (coordPushInputDependency == null) {
+            coordPushInputDependency = CoordInputDependencyFactory.getPushInputDependencies(pushMissingDependencies);
+        }
+        return coordPushInputDependency;
+    }
+
+    public void setPullInputDependencies(CoordInputDependency coordPullInputDependency) {
+        this.coordPullInputDependency = coordPullInputDependency;
+    }
+
+    public void setPushInputDependencies(CoordInputDependency coordPushInputDependency) {
+        this.coordPushInputDependency = coordPushInputDependency;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java
index 6c1e399..2907ca2 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -214,6 +214,8 @@ public enum ErrorCode {
     E1025(XLog.STD, "Coord status transit error: [{0}]"),
     E1026(XLog.STD, "SLA alert update command failed: {0}"),
     E1027(XLog.STD, "SLA change command failed. {0}"),
+    E1028(XLog.STD, "Coord input logic error. {0}"),
+
 
 
     E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index 11184d1..640d3cb 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -20,13 +20,9 @@ package org.apache.oozie.command.coord;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.text.ParseException;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.oozie.CoordinatorActionBean;
@@ -34,14 +30,11 @@ import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
-import org.apache.oozie.coord.TimeUnit;
-import org.apache.oozie.dependency.URIHandler;
-import org.apache.oozie.dependency.URIHandlerException;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
@@ -54,7 +47,6 @@ import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
-import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.LogUtils;
@@ -159,40 +151,38 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
 
             StringBuilder existList = new StringBuilder();
             StringBuilder nonExistList = new StringBuilder();
+            CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
+            CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
+
+
+            String missingDependencies = coordPullInputDependency.getMissingDependencies();
             StringBuilder nonResolvedList = new StringBuilder();
-            String firstMissingDependency = "";
-            String missingDeps = coordAction.getMissingDependencies();
-            CoordCommandUtils.getResolvedList(missingDeps, nonExistList, nonResolvedList);
 
+            CoordCommandUtils.getResolvedList(missingDependencies, nonExistList, nonResolvedList);
+            String firstMissingDependency = "";
             // For clarity regarding which is the missing dependency in synchronous order
             // instead of printing entire list, some of which, may be available
-            if(nonExistList.length() > 0) {
+            if (nonExistList.length() > 0) {
                 firstMissingDependency = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR)[0];
             }
             LOG.info("[" + actionId + "]::CoordActionInputCheck:: Missing deps:" + firstMissingDependency + " "
                     + nonResolvedList.toString());
-            // Updating the list of data dependencies that are available and those that are yet not
-            boolean status = checkInput(actionXml, existList, nonExistList, actionConf);
-            String pushDeps = coordAction.getPushMissingDependencies();
-            // Resolve latest/future only when all current missingDependencies and
-            // pushMissingDependencies are met
+
+
+            boolean status = checkResolvedInput(actionXml, existList, nonExistList, actionConf);
+            String nonExistListStr = nonExistList.toString();
+            boolean isPushDependenciesMet = coordPushInputDependency.isDependencyMet();
             if (status && nonResolvedList.length() > 0) {
-                status = (pushDeps == null || pushDeps.length() == 0) ? checkUnResolvedInput(actionXml, actionConf)
-                        : false;
+                status = (isPushDependenciesMet) ? checkUnResolvedInput(actionXml, actionConf) : false;
             }
             coordAction.setLastModifiedTime(currentTime);
             coordAction.setActionXml(actionXml.toString());
-            if (nonResolvedList.length() > 0 && status == false) {
-                nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
-            }
-            String nonExistListStr = nonExistList.toString();
-            if (!nonExistListStr.equals(missingDeps) || missingDeps.isEmpty()) {
-                // missingDeps null or empty means action should become READY
-                isChangeInDependency = true;
-                coordAction.setMissingDependencies(nonExistListStr);
-            }
-            if (status && (pushDeps == null || pushDeps.length() == 0)) {
-                String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId);
+
+            isChangeInDependency = isChangeInDependency(nonExistList, missingDependencies, nonResolvedList, status);
+
+            if (status && isPushDependenciesMet) {
+                String newActionXml = resolveCoordConfiguration(actionXml, actionConf, actionId,
+                        coordPullInputDependency, coordPushInputDependency);
                 actionXml.replace(0, actionXml.length(), newActionXml);
                 coordAction.setActionXml(actionXml.toString());
                 coordAction.setStatus(CoordinatorAction.Status.READY);
@@ -207,7 +197,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
                 updateCoordAction(coordAction, isChangeInDependency);
             }
             else {
-                if (!nonExistListStr.isEmpty() && pushDeps == null || pushDeps.length() == 0) {
+                if (!nonExistListStr.isEmpty() && isPushDependenciesMet) {
                     queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
                 }
                 else {
@@ -246,10 +236,25 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
         return null;
     }
 
+    private boolean isChangeInDependency(StringBuilder nonExistList, String missingDependencies,
+            StringBuilder nonResolvedList, boolean status) throws IOException {
+        if (nonResolvedList.length() > 0 && status == false) {
+            nonExistList.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR).append(nonResolvedList);
+        }
+        return coordAction.getPullInputDependencies().isChangeInDependency(nonExistList, missingDependencies,
+                nonResolvedList, status);
+    }
 
-    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId) throws Exception {
+    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId)
+            throws Exception {
+        return resolveCoordConfiguration(actionXml, actionConf, actionId, null, null);
+    }
+
+    static String resolveCoordConfiguration(StringBuilder actionXml, Configuration actionConf, String actionId,
+            CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml.toString());
-        ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId);
+        ELEvaluator eval = CoordELEvaluator.createDataEvaluator(eAction, actionConf, actionId, pullDependencies,
+                pushDependencies);
         materializeDataProperties(eAction, actionConf, eval);
         return XmlUtils.prettyPrint(eAction).toString();
     }
@@ -268,6 +273,7 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
         if (jpaService != null) {
             try {
                 if (isChangeInDependency) {
+                    coordAction.setMissingDependencies(coordAction.getPullInputDependencies().serialize());
                     CoordActionQueryExecutor.getInstance().executeUpdate(
                             CoordActionQuery.UPDATE_COORD_ACTION_FOR_INPUTCHECK, coordAction);
                     if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
@@ -281,12 +287,11 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
                             CoordActionQuery.UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, coordAction);
                 }
             }
-            catch (JPAExecutorException jex) {
+            catch (Exception jex) {
                 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
             }
         }
     }
-
     /**
      * This function reads the value of re-queue interval for coordinator input
      * check command from the Oozie configuration provided by Configuration
@@ -310,16 +315,26 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
      * @return true if all input paths are existed
      * @throws Exception thrown of unable to check input path
      */
-    protected boolean checkInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
+    protected boolean checkResolvedInput(StringBuilder actionXml, StringBuilder existList, StringBuilder nonExistList,
             Configuration conf) throws Exception {
-        Element eAction = XmlUtils.parseXml(actionXml.toString());
-        return checkResolvedUris(eAction, existList, nonExistList, conf);
+        return coordAction.getPullInputDependencies().checkPullMissingDependencies(coordAction, existList,
+                nonExistList);
     }
 
-    protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
+    /**
+     * Check un resolved input.
+     *
+     * @param coordAction the coord action
+     * @param actionXml the action xml
+     * @param conf the conf
+     * @return true, if successful
+     * @throws Exception the exception
+     */
+    protected boolean checkUnResolvedInput(CoordinatorActionBean coordAction, StringBuilder actionXml,
+            Configuration conf) throws Exception {
         Element eAction = XmlUtils.parseXml(actionXml.toString());
         LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking Latest/future");
-        boolean allExist = checkUnresolvedInstances(eAction, conf);
+        boolean allExist = checkUnresolvedInstances(coordAction, eAction, conf);
         if (allExist) {
             actionXml.replace(0, actionXml.length(), XmlUtils.prettyPrint(eAction).toString());
         }
@@ -327,6 +342,18 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
     }
 
     /**
+     * Check un resolved input.
+     *
+     * @param actionXml the action xml
+     * @param conf the conf
+     * @return true, if successful
+     * @throws Exception the exception
+     */
+    protected boolean checkUnResolvedInput(StringBuilder actionXml, Configuration conf) throws Exception {
+        return checkUnResolvedInput(coordAction, actionXml, conf);
+    }
+
+    /**
      * Materialize data properties defined in <action> tag. it includes dataIn(<DS>) and dataOut(<DS>) it creates a list
      * of files that will be needed.
      *
@@ -378,222 +405,23 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
      * @throws Exception thrown if failed to resolve data input and output paths
      */
     @SuppressWarnings("unchecked")
-    private boolean checkUnresolvedInstances(Element eAction, Configuration actionConf) throws Exception {
-        String strAction = XmlUtils.prettyPrint(eAction).toString();
-        Date nominalTime = DateUtils.parseDateOozieTZ(eAction.getAttributeValue("action-nominal-time"));
-        String actualTimeStr = eAction.getAttributeValue("action-actual-time");
-        Date actualTime = null;
-        if (actualTimeStr == null) {
-            LOG.debug("Unable to get action-actual-time from action xml, this job is submitted " +
-            "from previous version. Assign current date to actual time, action = " + actionId);
-            actualTime = new Date();
-        } else {
-            actualTime = DateUtils.parseDateOozieTZ(actualTimeStr);
-        }
+    private boolean checkUnresolvedInstances(CoordinatorActionBean coordAction, Element eAction,
+            Configuration actionConf) throws Exception {
 
-        StringBuffer resultedXml = new StringBuffer();
-
-        boolean ret;
-        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
-        if (inputList != null) {
-            ret = materializeUnresolvedEvent(inputList.getChildren("data-in", eAction.getNamespace()), nominalTime,
-                    actualTime, actionConf);
-            if (ret == false) {
-                resultedXml.append(strAction);
-                return false;
-            }
-        }
+        boolean ret = coordAction.getPullInputDependencies().checkUnresolved(coordAction, eAction);
 
         // Using latest() or future() in output-event is not intuitive.
         // We need to make sure, this assumption is correct.
         Element outputList = eAction.getChild("output-events", eAction.getNamespace());
         if (outputList != null) {
             for (Element dEvent : (List<Element>) outputList.getChildren("data-out", eAction.getNamespace())) {
-                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) != null) {
+                if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, dEvent.getNamespace()) != null) {
                     throw new CommandException(ErrorCode.E1006, "coord:latest()/future()",
                             " not permitted in output-event ");
                 }
             }
         }
-        return true;
-    }
-
-    /**
-     * Resolve the list of data input paths
-     *
-     * @param eDataEvents the list of data input elements
-     * @param nominalTime action nominal time
-     * @param actualTime current time
-     * @param conf action configuration
-     * @return true if all unresolved URIs can be resolved
-     * @throws Exception thrown if failed to resolve data input paths
-     */
-    @SuppressWarnings("unchecked")
-    private boolean materializeUnresolvedEvent(List<Element> eDataEvents, Date nominalTime, Date actualTime,
-            Configuration conf) throws Exception {
-        for (Element dEvent : eDataEvents) {
-            if (dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()) == null) {
-                continue;
-            }
-            ELEvaluator eval = CoordELEvaluator.createLazyEvaluator(actualTime, nominalTime, dEvent, conf);
-            String uresolvedInstance = dEvent.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace()).getTextTrim();
-            String unresolvedList[] = uresolvedInstance.split(CoordELFunctions.INSTANCE_SEPARATOR);
-            StringBuffer resolvedTmp = new StringBuffer();
-            for (int i = 0; i < unresolvedList.length; i++) {
-                String ret = CoordELFunctions.evalAndWrap(eval, unresolvedList[i]);
-                Boolean isResolved = (Boolean) eval.getVariable("is_resolved");
-                if (isResolved == false) {
-                    LOG.info("[" + actionId + "]::Cannot resolve: " + ret);
-                    return false;
-                }
-                if (resolvedTmp.length() > 0) {
-                    resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR);
-                }
-                resolvedTmp.append((String) eval.getVariable("resolved_path"));
-            }
-            if (resolvedTmp.length() > 0) {
-                if (dEvent.getChild("uris", dEvent.getNamespace()) != null) {
-                    resolvedTmp.append(CoordELFunctions.INSTANCE_SEPARATOR).append(
-                            dEvent.getChild("uris", dEvent.getNamespace()).getTextTrim());
-                    dEvent.removeChild("uris", dEvent.getNamespace());
-                }
-                Element uriInstance = new Element("uris", dEvent.getNamespace());
-                uriInstance.addContent(resolvedTmp.toString());
-                dEvent.getContent().add(1, uriInstance);
-            }
-            dEvent.removeChild(CoordCommandUtils.UNRESOLVED_INST_TAG, dEvent.getNamespace());
-        }
-
-        return true;
-    }
-
-    /**
-     * Check all resolved URIs existence
-     *
-     * @param eAction action element
-     * @param existList the list of existed paths
-     * @param nonExistList the list of paths to check existence
-     * @param conf action configuration
-     * @return true if all nonExistList paths exist
-     * @throws IOException thrown if unable to access the path
-     */
-    private boolean checkResolvedUris(Element eAction, StringBuilder existList, StringBuilder nonExistList,
-            Configuration conf) throws IOException {
-        Element inputList = eAction.getChild("input-events", eAction.getNamespace());
-        if (inputList != null) {
-            if (nonExistList.length() > 0) {
-                checkListOfPaths(existList, nonExistList, conf);
-            }
-            return nonExistList.length() == 0;
-        }
-        return true;
-    }
-
-    /**
-     * Check a list of non existed paths and add to exist list if it exists
-     *
-     * @param existList the list of existed paths
-     * @param nonExistList the list of paths to check existence
-     * @param conf action configuration
-     * @return true if all nonExistList paths exist
-     * @throws IOException thrown if unable to access the path
-     */
-    private boolean checkListOfPaths(StringBuilder existList, StringBuilder nonExistList, Configuration conf)
-            throws IOException {
-
-        String[] uriList = nonExistList.toString().split(CoordELFunctions.INSTANCE_SEPARATOR);
-        if (uriList[0] != null) {
-            LOG.info("[" + actionId + "]::ActionInputCheck:: In checkListOfPaths: " + uriList[0] + " is Missing.");
-        }
-
-        nonExistList.delete(0, nonExistList.length());
-        boolean allExists = true;
-        String existSeparator = "", nonExistSeparator = "";
-        String user = ParamChecker.notEmpty(conf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
-        for (int i = 0; i < uriList.length; i++) {
-            if (allExists) {
-                allExists = pathExists(uriList[i], conf, user);
-                LOG.info("[" + actionId + "]::ActionInputCheck:: File:" + uriList[i] + ", Exists? :" + allExists);
-            }
-            if (allExists) {
-                existList.append(existSeparator).append(uriList[i]);
-                existSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
-            }
-            else {
-                nonExistList.append(nonExistSeparator).append(uriList[i]);
-                nonExistSeparator = CoordELFunctions.INSTANCE_SEPARATOR;
-            }
-        }
-        return allExists;
-    }
-
-    /**
-     * Check if given path exists
-     *
-     * @param sPath uri path
-     * @param actionConf action configuration
-     * @return true if path exists
-     * @throws IOException thrown if unable to access the path
-     */
-    protected boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException {
-        LOG.debug("checking for the file " + sPath);
-        try {
-            URI uri = new URI(sPath);
-            URIHandlerService service = Services.get().get(URIHandlerService.class);
-            URIHandler handler = service.getURIHandler(uri);
-            return handler.exists(uri, actionConf, user);
-        }
-        catch (URIHandlerException e) {
-            coordAction.setErrorCode(e.getErrorCode().toString());
-            coordAction.setErrorMessage(e.getMessage());
-            if (e.getCause() != null && e.getCause() instanceof AccessControlException) {
-                throw (AccessControlException) e.getCause();
-            }
-            else {
-                throw new IOException(e);
-            }
-        }
-        catch (URISyntaxException e) {
-            coordAction.setErrorCode(ErrorCode.E0906.toString());
-            coordAction.setErrorMessage(e.getMessage());
-            throw new IOException(e);
-        }
-    }
-
-    /**
-     * The function create a list of URIs separated by "," using the instances time stamp and URI-template
-     *
-     * @param event : <data-in> event
-     * @param instances : List of time stamp seprated by ","
-     * @param unresolvedInstances : list of instance with latest/future function
-     * @return : list of URIs separated by ",".
-     * @throws Exception thrown if failed to create URIs from unresolvedInstances
-     */
-    @SuppressWarnings("unused")
-    private String createURIs(Element event, String instances, StringBuilder unresolvedInstances) throws Exception {
-        if (instances == null || instances.length() == 0) {
-            return "";
-        }
-        String[] instanceList = instances.split(CoordELFunctions.INSTANCE_SEPARATOR);
-        StringBuilder uris = new StringBuilder();
-
-        for (int i = 0; i < instanceList.length; i++) {
-            int funcType = CoordCommandUtils.getFuncType(instanceList[i]);
-            if (funcType == CoordCommandUtils.LATEST || funcType == CoordCommandUtils.FUTURE) {
-                if (unresolvedInstances.length() > 0) {
-                    unresolvedInstances.append(CoordELFunctions.INSTANCE_SEPARATOR);
-                }
-                unresolvedInstances.append(instanceList[i]);
-                continue;
-            }
-            ELEvaluator eval = CoordELEvaluator.createURIELEvaluator(instanceList[i]);
-            if (uris.length() > 0) {
-                uris.append(CoordELFunctions.INSTANCE_SEPARATOR);
-            }
-            uris.append(CoordELFunctions.evalAndWrap(eval, event.getChild("dataset", event.getNamespace()).getChild(
-                    "uri-template", event.getNamespace()).getTextTrim()));
-        }
-        return uris.toString();
+        return ret;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
index 4e1c5b3..cb866e2 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
@@ -18,11 +18,9 @@
 
 package org.apache.oozie.command.coord;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
-import java.util.List;
 import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
 import org.apache.oozie.dependency.DependencyChecker;
 import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
@@ -35,9 +33,11 @@ public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyC
 
     @Override
     protected Void execute() throws CommandException {
+        CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
+        CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
+
         LOG.info("STARTED for Action id [{0}]", actionId);
-        String pushMissingDeps = coordAction.getPushMissingDependencies();
-        if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+        if (coordPushInputDependency.isDependencyMet()) {
             LOG.info("Nothing to check. Empty push missing dependency");
         }
         else {
@@ -50,25 +50,19 @@ public class CoordActionUpdatePushMissingDependency extends CoordPushDependencyC
                 }
             }
             else {
-                LOG.debug("Updating with available uris=[{0}] where missing uris=[{1}]", availDepList.toString(),
-                        pushMissingDeps);
-
-                String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
-                List<String> stillMissingDepsList = new ArrayList<String>(Arrays.asList(missingDepsArray));
-                stillMissingDepsList.removeAll(availDepList);
+                String pushMissingDependencies = coordPushInputDependency.getMissingDependencies().toString();
+                LOG.debug("Updating with available uris = [{0}] where missing uris = [{1}]", pushMissingDependencies);
+                String[] missingDependenciesArray = DependencyChecker.dependenciesAsArray(pushMissingDependencies);
+                coordPushInputDependency.addToAvailableDependencies(availDepList);
                 boolean isChangeInDependency = true;
-                if (stillMissingDepsList.size() == 0) {
+                if (coordPushInputDependency.isDependencyMet()) {
                     // All push-based dependencies are available
-                    onAllPushDependenciesAvailable();
+                    onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet());
                 }
                 else {
-                    if (stillMissingDepsList.size() == missingDepsArray.length) {
+                    if (coordPushInputDependency.getMissingDependenciesAsList().size() == missingDependenciesArray.length) {
                         isChangeInDependency = false;
                     }
-                    else {
-                        String stillMissingDeps = DependencyChecker.dependenciesAsString(stillMissingDepsList);
-                        coordAction.setPushMissingDependencies(stillMissingDeps);
-                    }
                     if (isTimeout()) { // Poll and check as one last try
                         queue(new CoordPushDependencyCheckXCommand(coordAction.getId()), 100);
                     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
index 58ef483..0af7edc 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
@@ -18,9 +18,12 @@
 
 package org.apache.oozie.command.coord;
 
+import java.io.IOException;
 import java.io.StringReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.text.ParseException;
+import java.util.ArrayList;
 import java.util.TimeZone;
 import java.util.Map;
 import java.util.HashMap;
@@ -32,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
@@ -39,17 +43,25 @@ import org.apache.oozie.coord.CoordUtils;
 import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
+import org.apache.oozie.coord.input.dependency.CoordInputDependencyFactory;
+import org.apache.oozie.coord.input.dependency.CoordInputInstance;
 import org.apache.oozie.dependency.ActionDependency;
 import org.apache.oozie.dependency.DependencyChecker;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.dependency.URIHandler.DependencyType;
+import org.apache.oozie.dependency.URIHandlerException;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
+import org.jdom.Attribute;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 import org.quartz.CronExpression;
@@ -63,8 +75,9 @@ public class CoordCommandUtils {
     public static int OFFSET = 3;
     public static int ABSOLUTE = 4;
     public static int UNEXPECTED = -1;
+
     public static final String RESOLVED_UNRESOLVED_SEPARATOR = "!!";
-    public static final String UNRESOLVED_INST_TAG = "unresolved-instances";
+    public static final String UNRESOLVED_INSTANCES_TAG = "unresolved-instances";
 
     /**
      * parse a function like coord:latest(n)/future() and return the 'n'.
@@ -357,7 +370,7 @@ public class CoordCommandUtils {
             depList.append(urisWithDoneFlag);
         }
         if (unresolvedInstances.length() > 0) {
-            Element elemInstance = new Element(UNRESOLVED_INST_TAG, event.getNamespace());
+            Element elemInstance = new Element(UNRESOLVED_INSTANCES_TAG, event.getNamespace());
             elemInstance.addContent(unresolvedInstances.toString());
             event.getContent().add(1, elemInstance);
         }
@@ -482,20 +495,24 @@ public class CoordCommandUtils {
         appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
         appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
 
-        Map<String, StringBuilder> dependencyMap = null;
+        boolean isInputLogicSpecified = CoordUtils.isInputLogicSpecified(eAction);
 
         Element inputList = eAction.getChild("input-events", eAction.getNamespace());
         List<Element> dataInList = null;
         if (inputList != null) {
             dataInList = inputList.getChildren("data-in", eAction.getNamespace());
-            dependencyMap = materializeDataEvents(dataInList, appInst, conf);
+            materializeInputDataEvents(dataInList, appInst, conf, actionBean, isInputLogicSpecified);
         }
 
+        if(isInputLogicSpecified){
+            evaluateInputCheck(eAction.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAction.getNamespace()),
+                    CoordELEvaluator.createDataEvaluator(eAction, conf, actionId));
+        }
         Element outputList = eAction.getChild("output-events", eAction.getNamespace());
         List<Element> dataOutList = null;
         if (outputList != null) {
             dataOutList = outputList.getChildren("data-out", eAction.getNamespace());
-            materializeDataEvents(dataOutList, appInst, conf);
+            materializeOutputDataEvents(dataOutList, appInst, conf);
         }
 
         eAction.removeAttribute("start");
@@ -513,16 +530,6 @@ public class CoordCommandUtils {
         actionBean.setLastModifiedTime(new Date());
         actionBean.setStatus(CoordinatorAction.Status.WAITING);
         actionBean.setActionNumber(instanceCount);
-        if (dependencyMap != null) {
-            StringBuilder sbPull = dependencyMap.get(DependencyType.PULL.name());
-            if (sbPull != null) {
-                actionBean.setMissingDependencies(sbPull.toString());
-            }
-            StringBuilder sbPush = dependencyMap.get(DependencyType.PUSH.name());
-            if (sbPush != null) {
-                actionBean.setPushMissingDependencies(sbPush.toString());
-            }
-        }
         actionBean.setNominalTime(nominalTime);
         boolean isSla = CoordCommandUtils.materializeSLA(eAction, actionBean, conf);
         if (isSla == true) {
@@ -544,6 +551,7 @@ public class CoordCommandUtils {
         }
     }
 
+
     /**
      * @param eAction the actionXml related element
      * @param actionBean the coordinator action bean
@@ -554,12 +562,18 @@ public class CoordCommandUtils {
         String action = XmlUtils.prettyPrint(eAction).toString();
         StringBuilder actionXml = new StringBuilder(action);
         Configuration actionConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        actionBean.setActionXml(action);
+
+        if (CoordUtils.isInputLogicSpecified(eAction)) {
+            new CoordInputLogicEvaluatorUtil(actionBean).validateInputLogic();
+        }
 
         boolean isPushDepAvailable = true;
-        if (actionBean.getPushMissingDependencies() != null) {
-            ActionDependency actionDep = DependencyChecker.checkForAvailability(
-                    actionBean.getPushMissingDependencies(), actionConf, true);
-            if (actionDep.getMissingDependencies().size() != 0) {
+        String pushMissingDependencies = actionBean.getPushInputDependencies().getMissingDependencies();
+        if (pushMissingDependencies != null) {
+            ActionDependency actionDependencies = DependencyChecker.checkForAvailability(pushMissingDependencies,
+                    actionConf, true);
+            if (actionDependencies.getMissingDependencies().size() != 0) {
                 isPushDepAvailable = false;
             }
 
@@ -571,13 +585,16 @@ public class CoordCommandUtils {
             StringBuilder existList = new StringBuilder();
             StringBuilder nonExistList = new StringBuilder();
             StringBuilder nonResolvedList = new StringBuilder();
-            getResolvedList(actionBean.getMissingDependencies(), nonExistList, nonResolvedList);
-            isPullDepAvailable = coordActionInput.checkInput(actionXml, existList, nonExistList, actionConf);
+            getResolvedList(actionBean.getPullInputDependencies().getMissingDependencies(), nonExistList, nonResolvedList);
+            isPullDepAvailable = actionBean.getPullInputDependencies().checkPullMissingDependencies(actionBean,
+                    existList, nonExistList);
+
         }
 
         if (isPullDepAvailable && isPushDepAvailable) {
             // Check for latest/future
-            boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionXml, actionConf);
+            boolean isLatestFutureDepAvailable = coordActionInput.checkUnResolvedInput(actionBean, actionXml,
+                    actionConf);
             if (isLatestFutureDepAvailable) {
                 String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
                         actionBean.getId());
@@ -598,17 +615,68 @@ public class CoordCommandUtils {
      * @param conf
      * @throws Exception
      */
-    public static Map<String, StringBuilder> materializeDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf
-            ) throws Exception {
+    private static void materializeOutputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf)
+            throws Exception {
 
         if (events == null) {
-            return null;
+            return;
+        }
+
+        for (Element event : events) {
+            StringBuilder instances = new StringBuilder();
+            ELEvaluator eval = CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
+            // Handle list of instance tag
+            resolveInstances(event, instances, appInst, conf, eval);
+            // Handle start-instance and end-instance
+            resolveInstanceRange(event, instances, appInst, conf, eval);
+            // Separate out the unresolved instances
+            separateResolvedAndUnresolved(event, instances);
+
+        }
+    }
+
+    private static void evaluateInputCheck(Element root, ELEvaluator evalInputLogic) throws Exception {
+        for (Object event : root.getChildren()) {
+            Element inputElement = (Element) event;
+
+            resolveAttribute("dataset", inputElement, evalInputLogic);
+            resolveAttribute("name", inputElement, evalInputLogic);
+            resolveAttribute("min", inputElement, evalInputLogic);
+            resolveAttribute("wait", inputElement, evalInputLogic);
+            if (!inputElement.getChildren().isEmpty()) {
+                evaluateInputCheck(inputElement, evalInputLogic);
+            }
         }
-        StringBuilder unresolvedList = new StringBuilder();
-        Map<String, StringBuilder> dependencyMap = new HashMap<String, StringBuilder>();
+    }
+
+    private static String resolveAttribute(String attrName, Element elem, ELEvaluator eval) throws CoordinatorJobException {
+        Attribute attr = elem.getAttribute(attrName);
+        String val = null;
+        if (attr != null) {
+            try {
+                val = CoordELFunctions.evalAndWrap(eval, attr.getValue().trim());
+            }
+            catch (Exception e) {
+                throw new CoordinatorJobException(ErrorCode.E1004, e.getMessage(), e);
+            }
+            attr.setValue(val);
+        }
+        return val;
+    }
+
+    public static void materializeInputDataEvents(List<Element> events, SyncCoordAction appInst, Configuration conf,
+            CoordinatorActionBean actionBean, boolean isInputLogicSpecified) throws Exception {
+
+        if (events == null) {
+            return;
+        }
+        CoordInputDependency coordPullInputDependency = CoordInputDependencyFactory
+                .createPullInputDependencies(isInputLogicSpecified);
+        CoordInputDependency coordPushInputDependency = CoordInputDependencyFactory
+                .createPushInputDependencies(isInputLogicSpecified);
+        Map<String, String> unresolvedList = new HashMap<String, String>();
+
         URIHandlerService uriService = Services.get().get(URIHandlerService.class);
-        StringBuilder pullMissingDep = null;
-        StringBuilder pushMissingDep = null;
 
         for (Element event : events) {
             StringBuilder instances = new StringBuilder();
@@ -619,41 +687,44 @@ public class CoordCommandUtils {
             resolveInstanceRange(event, instances, appInst, conf, eval);
             // Separate out the unresolved instances
             String resolvedList = separateResolvedAndUnresolved(event, instances);
+            String name = event.getAttribute("name").getValue();
+
             if (!resolvedList.isEmpty()) {
                 Element uri = event.getChild("dataset", event.getNamespace()).getChild("uri-template",
                         event.getNamespace());
+
                 String uriTemplate = uri.getText();
                 URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
                 URIHandler handler = uriService.getURIHandler(baseURI);
+                List<CoordInputInstance> inputInstanceList = new ArrayList<CoordInputInstance>();
+
+                for (String inputInstance : resolvedList.split("#")) {
+                    inputInstanceList.add(new CoordInputInstance(inputInstance, false));
+                }
+
                 if (handler.getDependencyType(baseURI).equals(DependencyType.PULL)) {
-                    pullMissingDep = (pullMissingDep == null) ? new StringBuilder(resolvedList) : pullMissingDep.append(
-                            CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+                    coordPullInputDependency.addInputInstanceList(name, inputInstanceList);
                 }
                 else {
-                    pushMissingDep = (pushMissingDep == null) ? new StringBuilder(resolvedList) : pushMissingDep.append(
-                            CoordELFunctions.INSTANCE_SEPARATOR).append(resolvedList);
+                    coordPushInputDependency.addInputInstanceList(name, inputInstanceList);
+
                 }
             }
 
-            String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, event.getNamespace());
+            String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INSTANCES_TAG, event.getNamespace());
             if (tmpUnresolved != null) {
-                if (unresolvedList.length() > 0) {
-                    unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
-                }
-                unresolvedList.append(tmpUnresolved);
+                unresolvedList.put(name, tmpUnresolved);
             }
         }
-        if (unresolvedList.length() > 0) {
-            if (pullMissingDep == null) {
-                pullMissingDep = new StringBuilder();
-            }
-            pullMissingDep.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList);
+        for(String unresolvedDatasetName:unresolvedList.keySet()){
+            coordPullInputDependency.addUnResolvedList(unresolvedDatasetName, unresolvedList.get(unresolvedDatasetName));
         }
-        dependencyMap.put(DependencyType.PULL.name(), pullMissingDep);
-        dependencyMap.put(DependencyType.PUSH.name(), pushMissingDep);
-        return dependencyMap;
-    }
+        actionBean.setPullInputDependencies(coordPullInputDependency);
+        actionBean.setPushInputDependencies(coordPushInputDependency);
+        actionBean.setMissingDependencies(coordPullInputDependency.serialize());
+        actionBean.setPushMissingDependencies(coordPushInputDependency.serialize());
 
+    }
     /**
      * Get resolved string from missDepList
      *
@@ -797,4 +868,19 @@ public class CoordCommandUtils {
         }
         return nextNominalTime;
     }
+
+    public static boolean pathExists(String sPath, Configuration actionConf, String user) throws IOException,
+            URISyntaxException, URIHandlerException {
+        URI uri = new URI(sPath);
+        URIHandlerService service = Services.get().get(URIHandlerService.class);
+        URIHandler handler = service.getURIHandler(uri);
+        return handler.exists(uri, actionConf, user);
+    }
+
+    public static boolean pathExists(String sPath, Configuration actionConf) throws IOException, URISyntaxException,
+            URIHandlerException {
+        String user = ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), OozieClient.USER_NAME);
+        return pathExists(sPath, actionConf, user);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 39e6ac1..f6c1782 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.command.coord;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.AppType;
 import org.apache.oozie.CoordinatorActionBean;
@@ -34,6 +35,7 @@ import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.CoordUtils;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluatorUtil;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
 import org.apache.oozie.executor.jpa.CoordActionsActiveCountJPAExecutor;
@@ -148,7 +150,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
                     queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
                         Math.max(checkDelay, 0));
 
-                    if (coordAction.getPushMissingDependencies() != null) {
+                    if (!StringUtils.isEmpty(coordAction.getPushMissingDependencies())) {
                         // TODO: Delay in catchup mode?
                         queue(new CoordPushDependencyCheckXCommand(coordAction.getId(), true), 100);
                     }
@@ -485,7 +487,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
                 action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(),
                         nextTime, actualTime, lastActionNumber, jobConf, actionBean);
                 actionBean.setTimeOut(timeout);
-
                 if (!dryrun) {
                     storeToDB(actionBean, action, jobConf); // Storing to table
 
@@ -529,7 +530,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = "
                 + actionXml.length());
         actionBean.setActionXml(actionXml);
-
         insertList.add(actionBean);
         writeActionSlaRegistration(actionXml, actionBean, jobConf);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
index b05344d..2600a2b 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
@@ -21,10 +21,10 @@ package org.apache.oozie.command.coord;
 import java.io.IOException;
 import java.io.StringReader;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
@@ -34,7 +34,7 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.dependency.DependencyChecker;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
 import org.apache.oozie.dependency.ActionDependency;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
@@ -113,14 +113,15 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
             return null;
         }
 
-        String pushMissingDeps = coordAction.getPushMissingDependencies();
-        if (pushMissingDeps == null || pushMissingDeps.length() == 0) {
+        CoordInputDependency coordPushInputDependency = coordAction.getPushInputDependencies();
+        CoordInputDependency coordPullInputDependency = coordAction.getPullInputDependencies();
+        if (coordPushInputDependency.getMissingDependenciesAsList().size() == 0) {
             LOG.info("Nothing to check. Empty push missing dependency");
         }
         else {
-            String[] missingDepsArray = DependencyChecker.dependenciesAsArray(pushMissingDeps);
-            LOG.info("First Push missing dependency is [{0}] ", missingDepsArray[0]);
-            LOG.trace("Push missing dependencies are [{0}] ", pushMissingDeps);
+            List<String> missingDependenciesArray = coordPushInputDependency.getMissingDependenciesAsList();
+            LOG.info("First Push missing dependency is [{0}] ", missingDependenciesArray.get(0));
+            LOG.trace("Push missing dependencies are [{0}] ", missingDependenciesArray);
             if (registerForNotification) {
                 LOG.debug("Register for notifications is true");
             }
@@ -134,27 +135,27 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
                     throw new CommandException(ErrorCode.E1307, e.getMessage(), e);
                 }
 
+
+                boolean isChangeInDependency = true;
+                boolean timeout = false;
+                ActionDependency actionDependency = coordPushInputDependency.checkPushMissingDependencies(coordAction,
+                        registerForNotification);
                 // Check all dependencies during materialization to avoid registering in the cache.
                 // But check only first missing one afterwards similar to
                 // CoordActionInputCheckXCommand for efficiency. listPartitions is costly.
-                ActionDependency actionDep = DependencyChecker.checkForAvailability(missingDepsArray, actionConf,
-                        !registerForNotification);
+                if (actionDependency.getMissingDependencies().size() == missingDependenciesArray.size()) {
+                    isChangeInDependency = false;
+                }
+                else {
+                    coordPushInputDependency.setMissingDependencies(StringUtils.join(
+                            actionDependency.getMissingDependencies(), CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR));
+                }
 
-                boolean isChangeInDependency = true;
-                boolean timeout = false;
-                if (actionDep.getMissingDependencies().size() == 0) {
+                if (coordPushInputDependency.isDependencyMet()) {
                     // All push-based dependencies are available
-                    onAllPushDependenciesAvailable();
+                    onAllPushDependenciesAvailable(coordPullInputDependency.isDependencyMet());
                 }
                 else {
-                    if (actionDep.getMissingDependencies().size() == missingDepsArray.length) {
-                        isChangeInDependency = false;
-                    }
-                    else {
-                        String stillMissingDeps = DependencyChecker.dependenciesAsString(actionDep
-                                .getMissingDependencies());
-                        coordAction.setPushMissingDependencies(stillMissingDeps);
-                    }
                     // Checking for timeout
                     timeout = isTimeout();
                     if (timeout) {
@@ -166,15 +167,15 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
                     }
                 }
 
-                updateCoordAction(coordAction, isChangeInDependency);
+                updateCoordAction(coordAction, isChangeInDependency || coordPushInputDependency.isDependencyMet());
                 if (registerForNotification) {
-                    registerForNotification(actionDep.getMissingDependencies(), actionConf);
+                    registerForNotification(coordPushInputDependency.getMissingDependenciesAsList(), actionConf);
                 }
                 if (removeAvailDependencies) {
-                    unregisterAvailableDependencies(actionDep.getAvailableDependencies());
+                    unregisterAvailableDependencies(actionDependency.getAvailableDependencies());
                 }
                 if (timeout) {
-                    unregisterMissingDependencies(actionDep.getMissingDependencies(), actionId);
+                    unregisterMissingDependencies(coordPushInputDependency.getMissingDependenciesAsList(), actionId);
                 }
             }
             catch (Exception e) {
@@ -183,10 +184,9 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
                     LOG.debug("Queueing timeout command");
                     // XCommand.queue() will not work when there is a Exception
                     callableQueueService.queue(new CoordActionTimeOutXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
-                    unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
+                    unregisterMissingDependencies(missingDependenciesArray, actionId);
                 }
-                else if (coordAction.getMissingDependencies() != null
-                        && coordAction.getMissingDependencies().length() > 0) {
+                else if (coordPullInputDependency.getMissingDependenciesAsList().size() > 0) {
                     // Queue again on exception as RecoveryService will not queue this again with
                     // the action being updated regularly by CoordActionInputCheckXCommand
                     callableQueueService.queue(new CoordPushDependencyCheckXCommand(coordAction.getId(),
@@ -221,18 +221,18 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
         return (timeOut >= 0) && (waitingTime > timeOut);
     }
 
-    protected void onAllPushDependenciesAvailable() throws CommandException {
-        coordAction.setPushMissingDependencies("");
+    protected void onAllPushDependenciesAvailable(boolean isPullDependencyMeet) throws CommandException {
         Services.get().get(PartitionDependencyManagerService.class)
                 .removeCoordActionWithDependenciesAvailable(coordAction.getId());
-        if (coordAction.getMissingDependencies() == null || coordAction.getMissingDependencies().length() == 0) {
+        if (isPullDependencyMeet) {
             Date nominalTime = coordAction.getNominalTime();
             Date currentTime = new Date();
             // The action should become READY only if current time > nominal time;
             // CoordActionInputCheckXCommand will take care of moving it to READY when it is nominal time.
             if (nominalTime.compareTo(currentTime) > 0) {
                 LOG.info("[" + actionId + "]::ActionInputCheck:: nominal Time is newer than current time. Current="
-                        + DateUtils.formatDateOozieTZ(currentTime) + ", nominal=" + DateUtils.formatDateOozieTZ(nominalTime));
+                        + DateUtils.formatDateOozieTZ(currentTime) + ", nominal="
+                        + DateUtils.formatDateOozieTZ(nominalTime));
             }
             else {
                 String actionXml = resolveCoordConfiguration();
@@ -248,6 +248,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
             // wait till RecoveryService kicks in
             queue(new CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()));
         }
+        coordAction.getPushInputDependencies().setDependencyMet(true);
+
     }
 
     private String resolveCoordConfiguration() throws CommandException {
@@ -255,7 +257,8 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
             Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
             StringBuilder actionXml = new StringBuilder(coordAction.getActionXml());
             String newActionXml = CoordActionInputCheckXCommand.resolveCoordConfiguration(actionXml, actionConf,
-                    actionId);
+                    actionId, coordAction.getPullInputDependencies(), coordAction
+                            .getPushInputDependencies());
             actionXml.replace(0, actionXml.length(), newActionXml);
             return actionXml.toString();
         }
@@ -270,6 +273,7 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
         if (jpaService != null) {
             try {
                 if (isChangeInDependency) {
+                    coordAction.setPushMissingDependencies(coordAction.getPushInputDependencies().serialize());
                     CoordActionQueryExecutor.getInstance().executeUpdate(
                             CoordActionQuery.UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, coordAction);
                     if (EventHandlerService.isEnabled() && coordAction.getStatus() != CoordinatorAction.Status.READY) {
@@ -286,6 +290,9 @@ public class CoordPushDependencyCheckXCommand extends CoordinatorXCommand<Void>
             catch (JPAExecutorException jex) {
                 throw new CommandException(ErrorCode.E1021, jex.getMessage(), jex);
             }
+            catch (IOException ioe) {
+                throw new CommandException(ErrorCode.E1021, ioe.getMessage(), ioe);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index d4d1c08..f1f9ab2 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -53,8 +53,10 @@ import org.apache.oozie.command.SubmitTransitionXCommand;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.CoordUtils;
 import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CoordMaterializeTriggerService;
@@ -799,6 +801,11 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
         resolveIODataset(eAppXml);
         resolveIOEvents(eAppXml, dataNameList);
 
+        if (CoordUtils.isInputLogicSpecified(eAppXml)) {
+            resolveInputLogic(eAppXml.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eAppXml.getNamespace()), evalInst,
+                    dataNameList);
+        }
+
         resolveTagContents("app-path", eAppXml.getChild("action", eAppXml.getNamespace()).getChild("workflow",
                 eAppXml.getNamespace()), evalNofuncs);
         // TODO: If action or workflow tag is missing, NullPointerException will
@@ -896,6 +903,26 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
 
     }
 
+    private void resolveInputLogic(Element root, ELEvaluator evalInputLogic, HashMap<String, String> dataNameList)
+            throws Exception {
+        for (Object event : root.getChildren()) {
+            Element inputElement = (Element) event;
+            resolveAttribute("dataset", inputElement, evalInputLogic);
+            String name=resolveAttribute("name", inputElement, evalInputLogic);
+            resolveAttribute("or", inputElement, evalInputLogic);
+            resolveAttribute("and", inputElement, evalInputLogic);
+            resolveAttribute("combine", inputElement, evalInputLogic);
+
+            if (name != null) {
+                dataNameList.put(name, "data-in");
+            }
+
+            if (!inputElement.getChildren().isEmpty()) {
+                resolveInputLogic(inputElement, evalInputLogic, dataNameList);
+            }
+        }
+    }
+
     /**
      * Resolve input-events/dataset and output-events/dataset tags.
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java b/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
index f010a81..eabf473 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELConstants.java
@@ -33,4 +33,7 @@ public class CoordELConstants {
     public static final int SUBMIT_DAYS = 24 * 60;
 
     public static final String DEFAULT_DONE_FLAG = "_SUCCESS";
+    final public static String RESOLVED_PATH = "resolved_path";
+
+    final public static String IS_RESOLVED = "is_resolved";
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java b/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
index 8b2f456..fba8ac1 100644
--- a/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
+++ b/core/src/main/java/org/apache/oozie/coord/CoordELEvaluator.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.coord.input.dependency.CoordInputDependency;
+import org.apache.oozie.coord.input.logic.CoordInputLogicEvaluator;
 import org.apache.oozie.service.ELService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
@@ -141,7 +143,7 @@ public class CoordELEvaluator {
                     uris = uris.replaceAll(CoordELFunctions.INSTANCE_SEPARATOR, CoordELFunctions.DIR_SEPARATOR);
                     eval.setVariable(".dataout." + data.getAttributeValue("name"), uris);
                 }
-                if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
+                if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
                     eval.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true");
                 }
             }
@@ -172,7 +174,13 @@ public class CoordELEvaluator {
      * @return configured ELEvaluator
      * @throws Exception : If there is any date-time string in wrong format, the exception is thrown
      */
+
     public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId) throws Exception {
+        return createDataEvaluator(eJob, conf, actionId, null, null);
+    }
+
+    public static ELEvaluator createDataEvaluator(Element eJob, Configuration conf, String actionId,
+            CoordInputDependency pullDependencies, CoordInputDependency pushDependencies) throws Exception {
         ELEvaluator e = Services.get().get(ELService.class).createEvaluator("coord-action-start");
         setConfigToEval(e, conf);
         SyncCoordAction appInst = new SyncCoordAction();
@@ -184,6 +192,12 @@ public class CoordELEvaluator {
             appInst.setTimeUnit(TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit")));
             appInst.setActionId(actionId);
             appInst.setName(eJob.getAttributeValue("name"));
+            appInst.setPullDependencies(pullDependencies);
+            appInst.setPushDependencies(pushDependencies);
+            if (CoordUtils.isInputLogicSpecified(eJob)) {
+                e.setVariable(".actionInputLogic",
+                        XmlUtils.prettyPrint(eJob.getChild(CoordInputLogicEvaluator.INPUT_LOGIC, eJob.getNamespace())).toString());
+            }
         }
         String strActualTime = eJob.getAttributeValue("action-actual-time");
         if (strActualTime != null) {
@@ -200,11 +214,14 @@ public class CoordELEvaluator {
                 }
                 else {
                 }
-                if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
+                if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
                     e.setVariable(".datain." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
                     // check
                     // null
                 }
+                Element doneFlagElement = data.getChild("done-flag", data.getNamespace());
+                String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
+                e.setVariable(".datain." + data.getAttributeValue("name") + ".doneFlag", doneFlag);
             }
         }
         events = eJob.getChild("output-events", eJob.getNamespace());
@@ -217,7 +234,7 @@ public class CoordELEvaluator {
                 }
                 else {
                 }// TODO
-                if (data.getChild(CoordCommandUtils.UNRESOLVED_INST_TAG, data.getNamespace()) != null) {
+                if (data.getChild(CoordCommandUtils.UNRESOLVED_INSTANCES_TAG, data.getNamespace()) != null) {
                     e.setVariable(".dataout." + data.getAttributeValue("name") + ".unresolved", "true"); // TODO:
                     // check
                     // null