You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/04/19 08:34:59 UTC

[2/2] falcon git commit: FALCON-1802 EL Expressions support in Native Scheduler

FALCON-1802 EL Expressions support in Native Scheduler

Author: pavankumar526 <pa...@gmail.com>

Reviewers: "sandeepSamudrala <sa...@gmail.com>, Ajay Yadava <aj...@apache.org>"

Closes #57 from pavankumar526/FALCON-1802


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7e4dc0d9
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7e4dc0d9
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7e4dc0d9

Branch: refs/heads/master
Commit: 7e4dc0d923b5ef3d44f7fa5c85681d6fa89e0caf
Parents: 62393fe
Author: pavankumar526 <pa...@gmail.com>
Authored: Tue Apr 19 12:03:57 2016 +0530
Committer: pavankumar526 <pa...@gmail.com>
Committed: Tue Apr 19 12:03:57 2016 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/entity/EntityUtil.java    |  27 +++
 .../falcon/expression/ExpressionHelper.java     |  44 ++++
 .../apache/falcon/persistence/EntityBean.java   |  15 +-
 .../org/apache/falcon/util/CalendarUnit.java    |  39 ++++
 .../java/org/apache/falcon/util/DateUtil.java   | 175 +++++++++++++++
 .../falcon/expression/ExpressionHelperTest.java |  24 ++
 oozie/pom.xml                                   |   6 +
 .../apache/falcon/oozie/OozieEntityBuilder.java |   9 +
 .../OozieOrchestrationWorkflowBuilder.java      |  29 ++-
 .../NativeOozieProcessWorkflowBuilder.java      | 222 +++++++++++++++++++
 .../execution/FalconExecutionService.java       |  16 +-
 .../execution/ProcessExecutionInstance.java     | 109 ++++++---
 .../falcon/execution/ProcessExecutor.java       |  37 +++-
 .../apache/falcon/execution/SchedulerUtil.java  |  34 +++
 .../service/impl/DataAvailabilityService.java   |   9 +-
 .../service/impl/SchedulerService.java          |   6 +-
 .../request/DataNotificationRequest.java        |  17 ++
 .../org/apache/falcon/predicate/Predicate.java  |   1 +
 .../org/apache/falcon/state/EntityState.java    |  11 +
 .../org/apache/falcon/state/InstanceState.java  |   6 +-
 .../org/apache/falcon/state/StateService.java   |  19 +-
 .../falcon/state/store/EntityStateStore.java    |   2 +-
 .../falcon/state/store/jdbc/BeanMapperUtil.java |  37 +++-
 .../falcon/state/store/jdbc/JDBCStateStore.java |  27 ++-
 .../falcon/workflow/engine/DAGEngine.java       |   6 +-
 .../workflow/engine/FalconWorkflowEngine.java   |  18 +-
 .../falcon/workflow/engine/OozieDAGEngine.java  |  75 +++----
 .../execution/FalconExecutionServiceTest.java   |   7 +-
 .../apache/falcon/execution/MockDAGEngine.java  |   4 +-
 scheduler/src/test/resources/runtime.properties |  25 +++
 src/conf/runtime.properties                     |  11 +
 .../apache/falcon/unit/FalconUnitTestBase.java  |   2 +-
 .../AbstractSchedulerManagerJerseyIT.java       |   7 +-
 .../InstanceSchedulerManagerJerseyIT.java       |  35 ++-
 .../resources/process-nolatedata-template.xml   |  50 +++++
 35 files changed, 1040 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 96befa1..8825a65 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -94,6 +94,15 @@ public final class EntityUtil {
     public static final String WF_LIB_SEPARATOR = ",";
     private static final String STAGING_DIR_NAME_SEPARATOR = "_";
 
+    public static final ThreadLocal<SimpleDateFormat> PATH_FORMAT = new ThreadLocal<SimpleDateFormat>() {
+        @Override
+        protected SimpleDateFormat initialValue() {
+            SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmm");
+            format.setTimeZone(TimeZone.getTimeZone("UTC"));
+            return format;
+        }
+    };
+
     /** Priority with which the DAG will be scheduled.
      *  Matches the five priorities of Hadoop jobs.
      */
@@ -1082,4 +1091,22 @@ public final class EntityUtil {
         }
         return JOBPRIORITY.NORMAL;
     }
+
+
+    /**
+     * Evaluates feedpath based on instance time.
+     * @param feedPath
+     * @param instanceTime
+     * @return
+     */
+    public static String evaluateDependentPath(String feedPath, Date instanceTime) {
+        String timestamp = PATH_FORMAT.get().format(instanceTime);
+        String instancePath = feedPath.replaceAll("\\$\\{YEAR\\}", timestamp.substring(0, 4));
+        instancePath = instancePath.replaceAll("\\$\\{MONTH\\}", timestamp.substring(4, 6));
+        instancePath = instancePath.replaceAll("\\$\\{DAY\\}", timestamp.substring(6, 8));
+        instancePath = instancePath.replaceAll("\\$\\{HOUR\\}", timestamp.substring(8, 10));
+        instancePath = instancePath.replaceAll("\\$\\{MINUTE\\}", timestamp.substring(10, 12));
+        return instancePath;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
index 65aaeba..451cbba 100644
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
@@ -21,6 +21,8 @@ package org.apache.falcon.expression;
 import org.apache.commons.el.ExpressionEvaluatorImpl;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.util.CalendarUnit;
+import org.apache.falcon.util.DateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -29,6 +31,7 @@ import javax.servlet.jsp.el.ExpressionEvaluator;
 import javax.servlet.jsp.el.FunctionMapper;
 import javax.servlet.jsp.el.VariableResolver;
 import java.lang.reflect.Method;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
@@ -53,6 +56,7 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
     private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
     private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
 
+
     public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>() {
         @Override
         protected SimpleDateFormat initialValue() {
@@ -257,4 +261,44 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
         return originalValue;
     }
 
+    /**
+     * Converts date string to required format.
+     * @param dateTimeStr
+     * @param format
+     * @return
+     * @throws ParseException
+     */
+    public static String formatTime(String dateTimeStr, String format) throws ParseException {
+        Date dateTime = DateUtil.parseDateFalconTZ(dateTimeStr);
+        return DateUtil.formatDateCustom(dateTime, format);
+    }
+
+    /**
+     * Formats the instance and return.
+     * @return
+     */
+    public static String instanceTime() {
+        return DateUtil.formatDateFalconTZ(referenceDate.get());
+    }
+
+    /**
+     * EL function calculates date based on the following equation : newDate = baseDate + instance, * timeUnit.
+     * @param strBaseDate
+     * @param offset
+     * @param unit
+     * @return
+     * @throws Exception
+     */
+    public static String dateOffset(String strBaseDate, int offset, String unit) throws Exception {
+        Calendar baseCalDate = DateUtil.getCalendar(strBaseDate);
+        StringBuilder buffer = new StringBuilder();
+        baseCalDate.add(CalendarUnit.valueOf(unit).getCalendarUnit(), offset);
+        buffer.append(DateUtil.formatDateFalconTZ(baseCalDate));
+        return buffer.toString();
+    }
+
+    public static String user() {
+        return "${user.name}";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
index 274305c..f1c9cf3 100644
--- a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
@@ -24,6 +24,7 @@ import javax.persistence.CascadeType;
 import javax.persistence.Column;
 import javax.persistence.Entity;
 import javax.persistence.Id;
+import javax.persistence.Lob;
 import javax.persistence.NamedQueries;
 import javax.persistence.NamedQuery;
 import javax.persistence.OneToMany;
@@ -38,7 +39,7 @@ import java.util.List;
 @NamedQueries({
         @NamedQuery(name = PersistenceConstants.GET_ENTITY, query = "select OBJECT(a) from EntityBean a where a.id = :id"),
         @NamedQuery(name = PersistenceConstants.GET_ENTITY_FOR_STATE, query = "select OBJECT(a) from EntityBean a where a.state = :state"),
-        @NamedQuery(name = PersistenceConstants.UPDATE_ENTITY, query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+        @NamedQuery(name = PersistenceConstants.UPDATE_ENTITY, query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type, a.properties = :properties where a.id = :id"),
         @NamedQuery(name = PersistenceConstants.GET_ENTITIES_FOR_TYPE, query = "select OBJECT(a) from EntityBean a where a.type = :type"),
         @NamedQuery(name = PersistenceConstants.GET_ENTITIES, query = "select OBJECT(a) from EntityBean a"),
         @NamedQuery(name = PersistenceConstants.DELETE_ENTITY, query = "delete from EntityBean a where a.id = :id"),
@@ -68,6 +69,10 @@ public class EntityBean {
     @Column(name = "current_state")
     private String state;
 
+    @Column(name = "properties")
+    @Lob
+    private byte[] properties;
+
     @OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
     private List<InstanceBean> instanceBeans;
 
@@ -113,5 +118,13 @@ public class EntityBean {
     public void setInstanceBeans(List<InstanceBean> instanceBeans) {
         this.instanceBeans = instanceBeans;
     }
+
+    public byte[] getProperties() {
+        return properties;
+    }
+
+    public void setProperties(byte[] properties) {
+        this.properties = properties;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/util/CalendarUnit.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/CalendarUnit.java b/common/src/main/java/org/apache/falcon/util/CalendarUnit.java
new file mode 100644
index 0000000..f2a40b9
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/CalendarUnit.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import java.util.Calendar;
+
+/**
+ * TimeUnit used for Date operations.
+ */
+public enum CalendarUnit {
+    MINUTE(Calendar.MINUTE), HOUR(Calendar.HOUR), DAY(Calendar.DATE), MONTH(Calendar.MONTH),
+    YEAR(Calendar.YEAR), END_OF_DAY(Calendar.DATE), END_OF_MONTH(Calendar.MONTH), CRON(0), NONE(-1);
+
+    private int calendarUnit;
+
+    private CalendarUnit(int calendarUnit) {
+        this.calendarUnit = calendarUnit;
+    }
+
+    public int getCalendarUnit() {
+        return calendarUnit;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java
index baf5b13..9e9b8e8 100644
--- a/common/src/main/java/org/apache/falcon/util/DateUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java
@@ -17,12 +17,20 @@
  */
 package org.apache.falcon.util;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.entity.v0.Frequency;
 
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.ParsePosition;
+import java.text.SimpleDateFormat;
 import java.util.Calendar;
 import java.util.Date;
 import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Helper to get date operations.
@@ -39,8 +47,37 @@ public final class DateUtil {
 
     public static final long HOUR_IN_MILLIS = 60 * 60 * 1000;
 
+    private static final Pattern GMT_OFFSET_COLON_PATTERN = Pattern.compile("^GMT(\\-|\\+)(\\d{2})(\\d{2})$");
+
+    public static final TimeZone UTC = getTimeZone("UTC");
+
+    public static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'";
+    private static String activeTimeMask = ISO8601_UTC_MASK;
+    private static TimeZone activeTimeZone = UTC;
+
+    private static final Pattern VALID_TIMEZONE_PATTERN = Pattern.compile("^UTC$|^GMT(\\+|\\-)\\d{4}$");
+
+    private static final String ISO8601_TZ_MASK_WITHOUT_OFFSET = "yyyy-MM-dd'T'HH:mm";
+    private static boolean entityInUTC = true;
+
     private DateUtil() {}
 
+    /**
+     * Configures the Datetime parsing with  process timezone.
+     *
+     */
+    public static void setTimeZone(String tz) throws FalconException {
+        if (StringUtils.isBlank(tz)) {
+            tz = "UTC";
+        }
+        if (!VALID_TIMEZONE_PATTERN.matcher(tz).matches()) {
+            throw new FalconException("Invalid entity timezone, it must be 'UTC' or 'GMT(+/-)####");
+        }
+        activeTimeZone = TimeZone.getTimeZone(tz);
+        entityInUTC = activeTimeZone.equals(UTC);
+        activeTimeMask = (entityInUTC) ? ISO8601_UTC_MASK : ISO8601_TZ_MASK_WITHOUT_OFFSET + tz.substring(3);
+    }
+
     public static Date getNextMinute(Date time) throws Exception {
         Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         insCal.setTime(time);
@@ -99,4 +136,142 @@ public final class DateUtil {
     public static Date offsetTime(Date date, int seconds) {
         return new Date(1000L * seconds + date.getTime());
     }
+
+    /**
+     * Parses a datetime in ISO8601 format in the  process timezone.
+     *
+     * @param s string with the datetime to parse.
+     * @return the corresponding {@link java.util.Date} instance for the parsed date.
+     * @throws java.text.ParseException thrown if the given string was
+     * not an ISO8601 value for the  process timezone.
+     */
+    public static Date parseDateFalconTZ(String s) throws ParseException {
+        s = s.trim();
+        ParsePosition pos = new ParsePosition(0);
+        Date d = getISO8601DateFormat(activeTimeZone, activeTimeMask).parse(s, pos);
+        if (d == null) {
+            throw new ParseException("Could not parse [" + s + "] using [" + activeTimeMask + "] mask",
+                    pos.getErrorIndex());
+        }
+        if (s.length() > pos.getIndex()) {
+            throw new ParseException("Correct datetime string is followed by invalid characters: " + s, pos.getIndex());
+        }
+        return d;
+    }
+
+    private static DateFormat getISO8601DateFormat(TimeZone tz, String mask) {
+        DateFormat dateFormat = new SimpleDateFormat(mask);
+        // Stricter parsing to prevent dates such as 2011-12-50T01:00Z (December 50th) from matching
+        dateFormat.setLenient(false);
+        dateFormat.setTimeZone(tz);
+        return dateFormat;
+    }
+
+    private static DateFormat getSpecificDateFormat(String format) {
+        DateFormat dateFormat = new SimpleDateFormat(format);
+        dateFormat.setTimeZone(activeTimeZone);
+        return dateFormat;
+    }
+
+    /**
+     * Formats a {@link java.util.Date} as a string using the specified format mask.
+     * <p/>
+     * The format mask must be a {@link java.text.SimpleDateFormat} valid format mask.
+     *
+     * @param d {@link java.util.Date} to format.
+     * @return the string for the given date using the specified format mask,
+     * <code>NULL</code> if the {@link java.util.Date} instance was <code>NULL</code>
+     */
+    public static String formatDateCustom(Date d, String format) {
+        return (d != null) ? getSpecificDateFormat(format).format(d) : "NULL";
+    }
+
+    /**
+     * Formats a {@link java.util.Date} as a string in ISO8601 format using process timezone.
+     *
+     * @param d {@link java.util.Date} to format.
+     * @return the ISO8601 string for the given date, <code>NULL</code> if the {@link java.util.Date} instance was
+     * <code>NULL</code>
+     */
+    public static String formatDateFalconTZ(Date d) {
+        return (d != null) ? getISO8601DateFormat(activeTimeZone, activeTimeMask).format(d) : "NULL";
+    }
+
+    /**
+     * Returns the {@link java.util.TimeZone} for the given timezone ID.
+     *
+     * @param tzId timezone ID.
+     * @return  the {@link java.util.TimeZone} for the given timezone ID.
+     */
+    public static TimeZone getTimeZone(String tzId) {
+        if (tzId == null) {
+            throw new IllegalArgumentException("Timezone cannot be null");
+        }
+        tzId = handleGMTOffsetTZNames(tzId);    // account for GMT-####
+        TimeZone tz = TimeZone.getTimeZone(tzId);
+        // If these are not equal, it means that the tzId is not valid (invalid tzId's return GMT)
+        if (!tz.getID().equals(tzId)) {
+            throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
+        }
+        return tz;
+    }
+
+    /**
+     * {@link java.util.TimeZone#getTimeZone(String)} takes the timezone ID as an argument; for invalid IDs
+     * it returns the <code>GMT</code> TimeZone.  A timezone ID formatted like <code>GMT-####</code> is not a valid ID,
+     * however, it will actually map this to the <code>GMT-##:##</code> TimeZone, instead of returning the
+     * <code>GMT</code> TimeZone.  We check (later) check that a timezone ID is valid by calling
+     * {@link java.util.TimeZone#getTimeZone(String)} and seeing if the returned
+     * TimeZone ID is equal to the original; because we want to allow <code>GMT-####</code>, while still
+     * disallowing actual invalid IDs, we have to manually replace <code>GMT-####</code>
+     * with <code>GMT-##:##</code> first.
+     *
+     * @param tzId The timezone ID
+     * @return If tzId matches <code>GMT-####</code>, then we return <code>GMT-##:##</code>; otherwise,
+     * we return tzId unaltered
+     */
+    private static String handleGMTOffsetTZNames(String tzId) {
+        Matcher m = GMT_OFFSET_COLON_PATTERN.matcher(tzId);
+        if (m.matches() && m.groupCount() == 3) {
+            tzId = "GMT" + m.group(1) + m.group(2) + ":" + m.group(3);
+        }
+        return tzId;
+    }
+
+    /**
+     * Create a Calendar instance for UTC time zone using the specified date.
+     * @param dateString
+     * @return appropriate Calendar object
+     * @throws Exception
+     */
+    public static Calendar getCalendar(String dateString) throws Exception {
+        return getCalendar(dateString, activeTimeZone);
+    }
+
+    /**
+     * Create a Calendar instance using the specified date and Time zone.
+     * @param dateString
+     * @param tz : TimeZone
+     * @return appropriate Calendar object
+     * @throws Exception
+     */
+    public static Calendar getCalendar(String dateString, TimeZone tz) throws Exception {
+        Date date = DateUtil.parseDateFalconTZ(dateString);
+        Calendar calDate = Calendar.getInstance();
+        calDate.setTime(date);
+        calDate.setTimeZone(tz);
+        return calDate;
+    }
+
+    /**
+     * Formats a {@link java.util.Calendar} as a string in ISO8601 format process timezone.
+     *
+     * @param c {@link java.util.Calendar} to format.
+     * @return the ISO8601 string for the given date, <code>NULL</code> if the {@link java.util.Calendar} instance was
+     * <code>NULL</code>
+     */
+    public static String formatDateFalconTZ(Calendar c) {
+        return (c != null) ? formatDateFalconTZ(c.getTime()) : "NULL";
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
index da5dbca..b3895c3 100644
--- a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
@@ -81,4 +81,28 @@ public class ExpressionHelperTest {
             {"future(1,0)", "2015-02-01T00:00Z"},
         };
     }
+
+    @Test
+    public void testFormatTime() throws FalconException {
+        String output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy\")",
+                String.class);
+        Assert.assertEquals(output, "2016");
+        output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM\")",
+                String.class);
+        Assert.assertEquals(output, "2016-02");
+        output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM-dd\")",
+                String.class);
+        Assert.assertEquals(output, "2016-02-01");
+    }
+
+
+    @Test
+    public void testOffsetAndInstanceTime() throws FalconException {
+        String date = expressionHelper.evaluate("dateOffset(instanceTime(), 1, 'DAY')", String.class);
+        Assert.assertEquals(date, "2015-02-02T00:00Z");
+        date = expressionHelper.evaluate("dateOffset(instanceTime(), 3, 'HOUR')", String.class);
+        Assert.assertEquals(date, "2015-02-01T03:00Z");
+        date = expressionHelper.evaluate("dateOffset(instanceTime(), -25, 'MINUTE')", String.class);
+        Assert.assertEquals(date, "2015-01-31T23:35Z");
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 04b3df6..c14c625 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -103,6 +103,12 @@
             <scope>compile</scope>
         </dependency>
 
+        <dependency>
+            <groupId>joda-time</groupId>
+            <artifactId>joda-time</artifactId>
+            <version>${joda.version}</version>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index a36ee79..a856f8a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -109,6 +109,14 @@ public abstract class OozieEntityBuilder<T extends Entity> {
     public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException;
 
     public Properties build(Cluster cluster, Path buildPath, Map<String, String> properties) throws FalconException {
+        Properties props = new Properties();
+        if (properties != null) {
+            props.putAll(properties);
+        }
+        return build(cluster, buildPath, props);
+    }
+
+    public Properties build(Cluster cluster, Path buildPath, Properties properties) throws FalconException {
         Properties builderProperties = build(cluster, buildPath);
         if (properties == null || properties.isEmpty()) {
             return builderProperties;
@@ -130,6 +138,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
         return propertiesCopy;
     }
 
+
     protected String getStoragePath(Path path) {
         if (path != null) {
             return getStoragePath(path.toString());

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 181f2d2..562627e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -37,6 +37,7 @@ import org.apache.falcon.oozie.feed.FSReplicationWorkflowBuilder;
 import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder;
 import org.apache.falcon.oozie.feed.HCatReplicationWorkflowBuilder;
 import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
+import org.apache.falcon.oozie.process.NativeOozieProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
 import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
 import org.apache.falcon.oozie.workflow.ACTION;
@@ -58,6 +59,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
 
 import javax.xml.bind.JAXBElement;
 import javax.xml.namespace.QName;
@@ -93,11 +95,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
             new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
 
     private LifeCycle lifecycle;
+    private DateTime nominalTime;
 
     protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
     protected static final String MR_QUEUE_NAME = "queueName";
     protected static final String MR_JOB_PRIORITY = "jobPriority";
 
+    /**
+     * Represents Scheduler for Entities.
+     */
+    public enum Scheduler {
+        OOZIE, NATIVE
+    }
+
     public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) {
         super(entity);
         this.lifecycle = lifecycle;
@@ -115,7 +125,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         super(entity);
     }
 
-    public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
+    public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster,
+                                                        Tag lifecycle) throws FalconException {
+        return get(entity, cluster, lifecycle, Scheduler.OOZIE);
+    }
+
+    public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle,
+                                                        Scheduler scheduler)
         throws FalconException {
         switch (entity.getEntityType()) {
         case FEED:
@@ -166,6 +182,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
                 return new PigProcessWorkflowBuilder(process);
 
             case OOZIE:
+                if (Scheduler.NATIVE == scheduler) {
+                    return new NativeOozieProcessWorkflowBuilder(process);
+                }
                 return new OozieProcessWorkflowBuilder(process);
 
             case HIVE:
@@ -497,4 +516,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
         }
         return conf;
     }
+
+    public void setNominalTime(DateTime nominalTime) {
+        this.nominalTime = nominalTime;
+    }
+
+    public DateTime getNominalTime() {
+        return nominalTime;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
new file mode 100644
index 0000000..78e049d
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Workflow Builder for oozie process in case of Native Scheduler.
+ */
+public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuilder {
+
+    private static final ExpressionHelper EXPRESSION_HELPER = ExpressionHelper.get();
+    private static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+
+    public NativeOozieProcessWorkflowBuilder(org.apache.falcon.entity.v0.process.Process entity) {
+        super(entity);
+    }
+
+    @Override
+    public java.util.Properties build(Cluster cluster,
+                                      Path buildPath, Properties suppliedProps) throws FalconException {
+        Properties elProps = new Properties();
+        DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+        elProps.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), fmt.print(getNominalTime()));
+        elProps.put(WorkflowExecutionArgs.TIMESTAMP.getName(), fmt.print(getNominalTime()));
+        elProps.put(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED.getName(), "true");
+        elProps.put(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED.getName(), "false"); //check true or false
+
+
+        DateUtil.setTimeZone(entity.getTimezone().getID());
+        ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis()));
+        elProps.putAll(getInputProps(cluster));
+        elProps.putAll(getOutputProps());
+        elProps.putAll(evalProperties());
+        Properties buildProps  = build(cluster, buildPath);
+        buildProps.putAll(elProps);
+        copyPropsWithoutOverride(buildProps, suppliedProps);
+        return buildProps;
+    }
+
+    private void copyPropsWithoutOverride(Properties buildProps, Properties suppliedProps) {
+        if (suppliedProps == null || suppliedProps.isEmpty()) {
+            return;
+        }
+        for (String propertyName : suppliedProps.stringPropertyNames()) {
+            if (buildProps.containsKey(propertyName)) {
+                LOG.warn("User provided property {} is already declared in the entity and will be ignored.",
+                    propertyName);
+                continue;
+            }
+            String propertyValue = suppliedProps.getProperty(propertyName);
+            buildProps.put(propertyName, propertyValue);
+        }
+    }
+
+    private Properties evalProperties() throws FalconException {
+        Properties props = new Properties();
+        org.apache.falcon.entity.v0.process.Properties processProps = entity.getProperties();
+        for (Property property : processProps.getProperties()) {
+            String propName = property.getName();
+            String propValue = property.getValue();
+            String evalExp = EXPRESSION_HELPER.evaluateFullExpression(propValue, String.class);
+            props.put(propName, evalExp);
+        }
+        return props;
+    }
+
+    private Properties getOutputProps() throws FalconException {
+        Properties props = new Properties();
+        if (entity.getOutputs() == null) {
+            props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE);
+            props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE);
+            return props;
+        }
+        List<String> feedNames = new ArrayList<>();
+        List<String> feedInstancePaths= new ArrayList<>();
+        for (Output output : entity.getOutputs().getOutputs()) {
+            Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
+            feedNames.add(feed.getName());
+            String outputExp = output.getInstance();
+            Date outTime = EXPRESSION_HELPER.evaluate(outputExp, Date.class);
+            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+                org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                        EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+                if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
+                    continue;
+                }
+
+                List<Location> locations = FeedHelper.getLocations(cluster, feed);
+                for (Location loc : locations) {
+                    String path = EntityUtil.evaluateDependentPath(loc.getPath(), outTime);
+                    path = getStoragePath(path);
+                    if (loc.getType() != LocationType.DATA) {
+                        props.put(output.getName() + "." + loc.getType().toString().toLowerCase(), path);
+                    } else {
+                        props.put(output.getName(), path);
+                    }
+                    feedInstancePaths.add(path);
+                }
+            }
+        }
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(feedNames, ","));
+        props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(feedInstancePaths, ","));
+        return props;
+    }
+
+    private Properties getInputProps(Cluster clusterObj) throws FalconException {
+        Properties props = new Properties();
+
+        if (entity.getInputs() == null) {
+            props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE);
+            props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), NONE);
+            props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), NONE);
+            props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), NONE);
+            return props;
+        }
+        List<String> falconInputFeeds = new ArrayList<>();
+        List<String> falconInputNames = new ArrayList<>();
+        List<String> falconInputPaths = new ArrayList<>();
+        List<String> falconInputFeedStorageTypes = new ArrayList<>();
+        for (Input input : entity.getInputs().getInputs()) {
+            Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+            Storage storage = FeedHelper.createStorage(clusterObj, feed);
+            if (storage.getType() != Storage.TYPE.FILESYSTEM) {
+                throw new UnsupportedOperationException("Storage Type not supported " + storage.getType());
+            }
+            falconInputFeeds.add(feed.getName());
+            falconInputNames.add(input.getName());
+            falconInputFeedStorageTypes.add(storage.getType().name());
+            String partition  = input.getPartition();
+
+            String startTimeExp = input.getStart();
+            String endTimeExp = input.getEnd();
+            ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis()));
+            Date startTime = EXPRESSION_HELPER.evaluate(startTimeExp, Date.class);
+            Date endTime = EXPRESSION_HELPER.evaluate(endTimeExp, Date.class);
+
+            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+                org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                        EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+                if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
+                    continue;
+                }
+
+                List<Location> locations = FeedHelper.getLocations(cluster, feed);
+                for (Location loc : locations) {
+                    if (loc.getType() != LocationType.DATA) {
+                        continue;
+                    }
+                    List<String> paths = new ArrayList<>();
+                    List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
+                            startTime, endTime); // test when startTime and endTime are equal.
+                    for (Date instanceTime : instanceTimes) {
+                        String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime);
+                        if (StringUtils.isNotBlank(partition)) {
+                            if (!path.endsWith("/") && !partition.startsWith("/")) {
+                                path = path + "/";
+                            }
+                            path = path + partition;
+                        }
+                        path = getStoragePath(path);
+                        paths.add(path);
+                    }
+                    if (loc.getType() != LocationType.DATA) {
+                        props.put(input.getName() + "." + loc.getType().toString().toLowerCase(),
+                                StringUtils.join(paths, ","));
+                    } else {
+                        props.put(input.getName(), StringUtils.join(paths, ","));
+                    }
+                    falconInputPaths.add(StringUtils.join(paths, ","));
+                }
+            }
+        }
+        props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(falconInputFeeds, "#"));
+        props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), StringUtils.join(falconInputNames, "#"));
+        props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), StringUtils.join(falconInputPaths, "#"));
+        props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(),
+                StringUtils.join(falconInputFeedStorageTypes, "#"));
+        return props;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index 93c894d..a969c7a 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -35,6 +35,7 @@ import org.apache.falcon.state.store.AbstractStateStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -203,10 +204,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC
      * Schedules an entity.
      *
      * @param entity
+     * @param properties
      * @throws FalconException
      */
-    public void schedule(Entity entity) throws FalconException {
-        StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this);
+    public void schedule(Entity entity, Properties properties) throws FalconException {
+        StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this, properties);
     }
 
     /**
@@ -256,4 +258,14 @@ public final class FalconExecutionService implements FalconService, EntityStateC
             throw new FalconException("Entity executor for entity cluster key : " + id.getKey() + " does not exist.");
         }
     }
+
+    /**
+     * Schedules an entity.
+     *
+     * @param entity
+     * @throws FalconException
+     */
+    public void schedule(Process entity) throws FalconException {
+        schedule(entity, null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 2d666c3..49e1120 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Cluster;
 import org.apache.falcon.entity.v0.process.Input;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
 import org.apache.falcon.notification.service.event.DataEvent;
 import org.apache.falcon.notification.service.event.Event;
@@ -48,7 +49,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
@@ -61,12 +64,14 @@ import java.util.Properties;
 public class ProcessExecutionInstance extends ExecutionInstance {
     private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
     private final Process process;
-    private List<Predicate> awaitedPredicates = new ArrayList<>();
+    private List<Predicate> awaitedPredicates = Collections.synchronizedList(new ArrayList<Predicate>());
     private DAGEngine dagEngine = null;
-    private boolean hasTimedOut = false;
+    protected boolean hasTimedOut = false;
     private InstanceID id;
     private int instanceSequence;
+    private boolean areDataPredicatesEmpty;
     private final FalconExecutionService executionService = FalconExecutionService.get();
+    private final ExpressionHelper expressionHelper = ExpressionHelper.get();
 
     /**
      * Constructor.
@@ -83,7 +88,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         this.id = new InstanceID(process, cluster, getInstanceTime());
         computeInstanceSequence();
         dagEngine = DAGEngineFactory.getDAGEngine(cluster);
-        registerForNotifications(false);
+        areDataPredicatesEmpty = true;
     }
 
     /**
@@ -112,7 +117,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
 
     // Currently, registers for only data notifications to ensure gating conditions are met.
     // Can be extended to register for other notifications.
-    private void registerForNotifications(boolean isResume) throws FalconException {
+    public void registerForNotifications(boolean isResume) throws FalconException {
         if (process.getInputs() == null) {
             return;
         }
@@ -122,14 +127,40 @@ public class ProcessExecutionInstance extends ExecutionInstance {
                 continue;
             }
             Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
-            List<Path> paths = new ArrayList<>();
+            String startTimeExp = input.getStart();
+            String endTimeExp = input.getEnd();
+            DateTime processInstanceTime = getInstanceTime();
+            expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis()));
+
+            Date startTime = expressionHelper.evaluate(startTimeExp, Date.class);
+            Date endTime = expressionHelper.evaluate(endTimeExp, Date.class);
+
             for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+                org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+                        EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+                if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
+                    continue;
+                }
+                List<Path> paths = new ArrayList<>();
                 List<Location> locations = FeedHelper.getLocations(cluster, feed);
                 for (Location loc : locations) {
                     if (loc.getType() != LocationType.DATA) {
                         continue;
                     }
-                    paths.add(new Path(loc.getPath()));
+                    List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
+                            startTime, endTime);
+                    for (Date instanceTime : instanceTimes) {
+                        String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime);
+                        if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) {
+                            if (!path.endsWith("/")) {
+                                path = path + "/";
+                            }
+                            path = path + feed.getAvailabilityFlag();
+                        }
+                        if (!paths.contains(new Path(path))) {
+                            paths.add(new Path(path));
+                        }
+                    }
                 }
 
                 Predicate predicate = Predicate.createDataPredicate(paths);
@@ -137,21 +168,19 @@ public class ProcessExecutionInstance extends ExecutionInstance {
                 if (isResume && !awaitedPredicates.contains(predicate)) {
                     continue;
                 }
-                // TODO : Revisit this once the Data Notification Service has been built
-                // TODO Very IMP :  Need to change the polling frequency
+                addDataPredicate(predicate);
                 DataAvailabilityService.DataRequestBuilder requestBuilder =
                         (DataAvailabilityService.DataRequestBuilder)
                                 NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
                                         .createRequestBuilder(executionService, getId());
                 requestBuilder.setLocations(paths)
                         .setCluster(cluster.getName())
-                        .setPollingFrequencyInMillis(100)
+                        .setPollingFrequencyInMillis(SchedulerUtil.getPollingFrequencyinMillis(process.getFrequency()))
                         .setTimeoutInMillis(getTimeOutInMillis())
                         .setLocations(paths);
                 NotificationServicesRegistry.register(requestBuilder.build());
-                LOG.info("Registered for a data notification for process {} for data location {}",
-                        process.getName(), StringUtils.join(paths, ","));
-                awaitedPredicates.add(predicate);
+                LOG.info("Registered for a data notification for process {} of instance time {} for data location {}",
+                        process.getName(), getInstanceTime(), StringUtils.join(paths, ","));
             }
         }
     }
@@ -170,22 +199,26 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         case DATA_AVAILABLE:
             // Data has not become available and the wait time has passed
             if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
-                if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) {
-                    hasTimedOut = true;
-                }
-            } else {
-                // If the event matches any of the awaited predicates, remove the predicate of the awaited list
-                Predicate toRemove = null;
-                for (Predicate predicate : awaitedPredicates) {
-                    if (predicate.evaluate(Predicate.getPredicate(event))) {
-                        toRemove = predicate;
-                        break;
-                    }
-                }
-                if (toRemove != null) {
-                    awaitedPredicates.remove(toRemove);
+                hasTimedOut = true;
+            }
+            // If the event matches any of the awaited predicates, remove the predicate of the awaited list
+            Predicate toRemove = null;
+            synchronized (awaitedPredicates) {
+            Iterator<Predicate> iterator = awaitedPredicates.iterator();
+            while (iterator.hasNext()) {
+                Predicate predicate = iterator.next();
+                if (predicate.evaluate(Predicate.getPredicate(event))) {
+                    toRemove = predicate;
+                    break;
                 }
             }
+            if (toRemove != null) {
+                awaitedPredicates.remove(toRemove);
+            }
+            if (awaitedPredicates.size() == 0) {
+                areDataPredicatesEmpty = true;
+            }
+        }
             break;
         default:
         }
@@ -200,13 +233,16 @@ public class ProcessExecutionInstance extends ExecutionInstance {
         if (awaitedPredicates.isEmpty()) {
             return true;
         } else {
-            // If it is waiting to be scheduled, it is in ready.
-            for (Predicate predicate : awaitedPredicates) {
-                if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) {
-                    return false;
+            synchronized (awaitedPredicates) {
+                Iterator<Predicate> iterator = awaitedPredicates.iterator();
+                while (iterator.hasNext()) {
+                    Predicate predicate = iterator.next();
+                    if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) {
+                        return false;
+                    }
                 }
+                return true;
             }
-            return true;
         }
     }
 
@@ -338,4 +374,15 @@ public class ProcessExecutionInstance extends ExecutionInstance {
     public void rerun() throws FalconException {
         registerForNotifications(false);
     }
+
+    public boolean areDataAwaitingPredicatesEmpty() {
+        return areDataPredicatesEmpty;
+    }
+
+    protected synchronized void addDataPredicate(Predicate predicate) {
+        synchronized (awaitedPredicates) {
+            awaitedPredicates.add(predicate);
+            areDataPredicatesEmpty = false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 0fc68f0..fec5f31 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -33,6 +33,7 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.exception.InvalidStateTransitionException;
 import org.apache.falcon.exception.StateStoreException;
 import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
 import org.apache.falcon.notification.service.event.Event;
 import org.apache.falcon.notification.service.event.EventType;
 import org.apache.falcon.notification.service.event.JobCompletedEvent;
@@ -93,8 +94,9 @@ public class ProcessExecutor extends EntityExecutor {
             initInstances();
         }
         // Check to handle restart and restoration from state store.
-        if (STATE_STORE.getEntity(id.getEntityID()).getCurrentState() != EntityState.STATE.SCHEDULED) {
-            dryRun();
+        EntityState entityState = STATE_STORE.getEntity(id.getEntityID());
+        if (entityState.getCurrentState() != EntityState.STATE.SCHEDULED) {
+            dryRun(entityState.getProperties());
         } else {
             LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster);
             LOG.info("Loading instances for process {} from state store.", process.getName());
@@ -103,8 +105,8 @@ public class ProcessExecutor extends EntityExecutor {
         registerForNotifications(getLastInstanceTime());
     }
 
-    private void dryRun() throws FalconException {
-        DAGEngineFactory.getDAGEngine(cluster).submit(process);
+    private void dryRun(Properties properties) throws FalconException {
+        DAGEngineFactory.getDAGEngine(cluster).submit(process, properties);
     }
 
     // Initializes the cache of execution instances. Cache is backed by the state store.
@@ -419,6 +421,30 @@ public class ProcessExecutor extends EntityExecutor {
                     stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
                 }
                 break;
+            case DATA_AVAILABLE:
+                instance = instances.get((InstanceID)event.getTarget());
+                instance.onEvent(event);
+                switch (((DataEvent) event).getStatus()) {
+                case AVAILABLE:
+                    if (instance.areDataAwaitingPredicatesEmpty() && !instance.hasTimedOut) {
+                        LOG.info("Data conditions met for instance {} and scheduled for running ", instance.getId());
+                        stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+                    } else if (instance.areDataAwaitingPredicatesEmpty()) {
+                        LOG.info("Instance {} timedout since input data not available", instance.getId());
+                        stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+                    } else {
+                        STATE_STORE.updateExecutionInstance(new InstanceState(instance));
+                    }
+                    break;
+                case UNAVAILABLE:
+                    if (instance.areDataAwaitingPredicatesEmpty()) {
+                        stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+                    }
+                    break;
+                default:
+                    throw new InvalidStateTransitionException("Invalid Data event status.");
+                }
+                break;
             default:
                 if (isTriggerEvent(event)) {
                     instance = buildInstance(event);
@@ -473,7 +499,8 @@ public class ProcessExecutor extends EntityExecutor {
         return event.getTarget().equals(id)
                 || event.getType() == EventType.JOB_COMPLETED
                 || event.getType() == EventType.JOB_SCHEDULED
-                || event.getType() == EventType.RE_RUN;
+                || event.getType() == EventType.RE_RUN
+                || event.getType() == EventType.DATA_AVAILABLE;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
index 3e7fc9b..236da11 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.execution;
 
 import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.util.RuntimeProperties;
 import org.joda.time.DateTime;
 
 /**
@@ -27,6 +28,14 @@ public final class SchedulerUtil {
 
     private static final long MINUTE_IN_MS = 60 * 1000L;
     private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
+    public static final String MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+            "falcon.scheduler.minutely.process.polling.frequency.millis";
+    public static final String HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+            "falcon.scheduler.hourly.process.polling.frequency.millis";
+    public static final String DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+            "falcon.scheduler.daily.process.polling.frequency.millis";
+    public static final String MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+            "falcon.scheduler.monthly.process.polling.frequency.millis";
 
     private SchedulerUtil(){};
 
@@ -51,4 +60,29 @@ public final class SchedulerUtil {
             throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name());
         }
     }
+
+    /**
+     *
+     * @param frequency
+     * @return
+     */
+    public static long getPollingFrequencyinMillis(Frequency frequency) {
+        switch (frequency.getTimeUnit()) {
+        case minutes:
+            return Long.parseLong(RuntimeProperties.get().getProperty(MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+                    "20000"));
+        case hours:
+            return Long.parseLong(RuntimeProperties.get().getProperty(HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+                    "60000"));
+        case days:
+            return Long.parseLong(RuntimeProperties.get().getProperty(DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+                    "120000"));
+        case months:
+            return Long.parseLong(RuntimeProperties.get().getProperty(MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+                    "180000"));
+        default:
+            throw new IllegalArgumentException("Unhandled frequency time unit " + frequency.getTimeUnit());
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
index 732da62..1240be9 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
@@ -67,6 +67,9 @@ public class DataAvailabilityService implements FalconNotificationService {
     public void register(NotificationRequest request) throws NotificationServiceException {
         LOG.info("Registering Data notification for " + request.getCallbackId().toString());
         DataNotificationRequest dataNotificationRequest = (DataNotificationRequest) request;
+        if (instancesToIgnore.containsKey(dataNotificationRequest.getCallbackId())) {
+            instancesToIgnore.remove(dataNotificationRequest.getCallbackId());
+        }
         delayQueue.offer(dataNotificationRequest);
     }
 
@@ -246,7 +249,11 @@ public class DataAvailabilityService implements FalconNotificationService {
                                              Map<Path, Boolean> locations) throws IOException {
             for (Path path : unAvailablePaths) {
                 if (fs.exists(path)) {
-                    locations.put(path, true);
+                    if (locations.containsKey(path)) {
+                        locations.put(path, true);
+                    } else {
+                        locations.put(new Path(path.toUri().getPath()), true);
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index 401c57e..a110e64 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -43,6 +43,8 @@ import org.apache.falcon.notification.service.request.JobScheduleNotificationReq
 import org.apache.falcon.notification.service.request.NotificationRequest;
 import org.apache.falcon.predicate.Predicate;
 import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
 import org.apache.falcon.state.ID;
 import org.apache.falcon.state.InstanceID;
 import org.apache.falcon.state.InstanceState;
@@ -304,7 +306,9 @@ public class SchedulerService implements FalconNotificationService, Notification
                             DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced);
                         }
                     } else {
-                        externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance);
+                        EntityState entityState = STATE_STORE.getEntity(new EntityID(instance.getEntity()));
+                        externalId = DAGEngineFactory.getDAGEngine(instance.getCluster())
+                                .run(instance, entityState.getProperties());
                     }
                     LOG.info("Scheduled job {} for instance {}", externalId, instance.getId());
                     JobScheduledEvent event = new JobScheduledEvent(instance.getId(),

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
index c7dd5d3..9e2b993 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
@@ -151,6 +151,15 @@ public class DataNotificationRequest extends NotificationRequest implements Dela
         if (!locations.equals(that.locations)) {
             return false;
         }
+        if (pollingFrequencyInMillis != (that.pollingFrequencyInMillis)) {
+            return false;
+        }
+        if (timeoutInMillis != that.timeoutInMillis) {
+            return false;
+        }
+        if (createdTimeInMillis != that.createdTimeInMillis) {
+            return false;
+        }
         return true;
     }
 
@@ -158,8 +167,16 @@ public class DataNotificationRequest extends NotificationRequest implements Dela
     public int hashCode() {
         int result = cluster.hashCode();
         result = 31 * result + (locations != null ? locations.hashCode() : 0);
+        result = 31 * result + Long.valueOf(pollingFrequencyInMillis).hashCode();
+        result = 31 * result + Long.valueOf(timeoutInMillis).hashCode();
+        result = 31 * result + Long.valueOf(createdTimeInMillis).hashCode();
         return result;
     }
 
+    @Override
+    public String toString() {
+        return "cluster: " + this.getCluster() + " locations: " + this.locations + " createdTime: "
+                + this.createdTimeInMillis;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index c248db6..93dcb12 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -163,6 +163,7 @@ public class Predicate implements Serializable {
      * @return
      */
     public static Predicate createDataPredicate(List<Path> paths) {
+        Collections.sort(paths);
         return new Predicate(TYPE.DATA)
                 .addClause("path", StringUtils.join(paths, ","));
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index 38479a4..1b26c7a 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -20,6 +20,8 @@ package org.apache.falcon.state;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.exception.InvalidStateTransitionException;
 
+import java.util.Properties;
+
 /**
  * Represents the state of a schedulable entity.
  * Implements {@link org.apache.falcon.state.StateMachine} for an entity.
@@ -27,8 +29,17 @@ import org.apache.falcon.exception.InvalidStateTransitionException;
 public class EntityState implements StateMachine<EntityState.STATE, EntityState.EVENT> {
     private Entity entity;
     private STATE currentState;
+    private Properties properties;
     private static final STATE INITIAL_STATE = STATE.SUBMITTED;
 
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+
     /**
      * Enumerates all the valid states of a schedulable entity and the valid transitions from that state.
      */

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index b862e4d..f5e6f5b 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -270,7 +270,11 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
 
     @Override
     public String toString() {
-        return instance.getId().toString() + "STATE: " + currentState.toString();
+        StringBuilder output = new StringBuilder();
+        if (instance.getId() != null) {
+            output.append(instance.getId());
+        }
+        return output.append("STATE").append(currentState.toString()).toString();
     }
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index 638bb6e..8a66fb0 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -21,11 +21,14 @@ import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.ProcessExecutionInstance;
 import org.apache.falcon.state.store.AbstractStateStore;
 import org.apache.falcon.state.store.StateStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Properties;
+
 /**
  * A service that fetches state from state store, handles state transitions of entities and instances,
  * invokes state change handler and finally persists the new state in the state store.
@@ -62,14 +65,18 @@ public final class StateService {
      * @param handler
      * @throws FalconException
      */
-    public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
-        throws FalconException {
+    public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler,
+                                  Properties props) throws FalconException {
         EntityID id = new EntityID(entity);
         if (!stateStore.entityExists(id)) {
             // New entity
             if (event == EntityState.EVENT.SUBMIT) {
                 callbackHandler(entity, EntityState.EVENT.SUBMIT, handler);
-                stateStore.putEntity(new EntityState(entity));
+                EntityState entityState = new EntityState(entity);
+                if (props != null && !props.isEmpty()) {
+                    entityState.setProperties(props);
+                }
+                stateStore.putEntity(entityState);
                 LOG.debug("Entity {} submitted due to event {}.", id, event.name());
             } else {
                 throw new FalconException("Entity " + id + " does not exist in state store.");
@@ -90,6 +97,11 @@ public final class StateService {
         }
     }
 
+    public void handleStateChange(Entity entity, EntityState.EVENT event,
+                                  EntityStateChangeHandler handler) throws FalconException {
+        handleStateChange(entity, event, handler, null);
+    }
+
     // Invokes the right method on the state change handler
     private void callbackHandler(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
         throws FalconException {
@@ -133,6 +145,7 @@ public final class StateService {
             if (event == InstanceState.EVENT.TRIGGER) {
                 callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler);
                 stateStore.putExecutionInstance(new InstanceState(instance));
+                ((ProcessExecutionInstance) instance).registerForNotifications(false);
                 LOG.debug("Instance {} triggered due to event {}.", id, event.name());
             } else if (event == InstanceState.EVENT.EXTERNAL_TRIGGER) {
                 callbackHandler(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, handler);

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
index a7deb89..10490e4 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -45,7 +45,7 @@ public interface EntityStateStore {
      * @param entityId
      * @return true, if entity exists in store.
      */
-    boolean entityExists(EntityID entityId) throws StateStoreException;;
+    boolean entityExists(EntityID entityId) throws StateStoreException;
 
     /**
      * @param state

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 3384186..228b1f9 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -59,7 +59,7 @@ public final class BeanMapperUtil {
      * @param entityState
      * @return
      */
-    public static EntityBean convertToEntityBean(EntityState entityState) {
+    public static EntityBean convertToEntityBean(EntityState entityState) throws IOException {
         EntityBean entityBean = new EntityBean();
         Entity entity = entityState.getEntity();
         String id = new EntityID(entity).getKey();
@@ -67,6 +67,10 @@ public final class BeanMapperUtil {
         entityBean.setName(entity.getName());
         entityBean.setState(entityState.getCurrentState().toString());
         entityBean.setType(entity.getEntityType().toString());
+        if (entityState.getProperties() != null && !entityState.getProperties().isEmpty()) {
+            byte[] props = getProperties(entityState);
+            entityBean.setProperties(props);
+        }
         return entityBean;
     }
 
@@ -76,11 +80,26 @@ public final class BeanMapperUtil {
      * @return
      * @throws StateStoreException
      */
-    public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException {
+    public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException, IOException {
         try {
             Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
             EntityState entityState = new EntityState(entity);
             entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState()));
+            byte[] result = entityBean.getProperties();
+            if (result != null && result.length != 0) {
+                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result);
+                ObjectInputStream in = null;
+                Properties properties = null;
+                try {
+                    in = new ObjectInputStream(byteArrayInputStream);
+                    properties = (Properties) in.readObject();
+                } catch (ClassNotFoundException e) {
+                    throw new IOException(e);
+                } finally {
+                    IOUtils.closeQuietly(in);
+                }
+                entityState.setProperties(properties);
+            }
             return entityState;
         } catch (FalconException e) {
             throw new StateStoreException(e);
@@ -94,7 +113,7 @@ public final class BeanMapperUtil {
      * @throws StateStoreException
      */
     public static Collection<EntityState> convertToEntityState(Collection<EntityBean> entityBeans)
-        throws StateStoreException {
+        throws StateStoreException, IOException {
         List<EntityState> entityStates = new ArrayList<>();
         if (entityBeans != null && !entityBeans.isEmpty()) {
             for (EntityBean entityBean : entityBeans) {
@@ -306,6 +325,18 @@ public final class BeanMapperUtil {
         }
     }
 
+    public static byte [] getProperties(EntityState entityState) throws IOException {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        ObjectOutputStream out = null;
+        try {
+            out = new ObjectOutputStream(byteArrayOutputStream);
+            out.writeObject(entityState.getProperties());
+            return byteArrayOutputStream.toByteArray();
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+    }
+
     /**
      * @param summary
      * @return A map of state and count given the JQL result.

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 38d9217..825fbc1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -75,11 +75,16 @@ public final class JDBCStateStore extends AbstractStateStore {
         if (entityExists(entityID)) {
             throw new StateStoreException("Entity with key, " + key + " already exists.");
         }
-        EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState);
-        EntityManager entityManager = getEntityManager();
-        beginTransaction(entityManager);
-        entityManager.persist(entityBean);
-        commitAndCloseTransaction(entityManager);
+        EntityBean entityBean = null;
+        try {
+            entityBean = BeanMapperUtil.convertToEntityBean(entityState);
+            EntityManager entityManager = getEntityManager();
+            beginTransaction(entityManager);
+            entityManager.persist(entityBean);
+            commitAndCloseTransaction(entityManager);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
     }
 
 
@@ -97,7 +102,11 @@ public final class JDBCStateStore extends AbstractStateStore {
         if (entityBean == null) {
             return null;
         }
-        return BeanMapperUtil.convertToEntityState(entityBean);
+        try {
+            return BeanMapperUtil.convertToEntityState(entityBean);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
     }
 
     private EntityBean getEntityBean(EntityID id) {
@@ -133,7 +142,11 @@ public final class JDBCStateStore extends AbstractStateStore {
         Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ENTITIES);
         List result = q.getResultList();
         entityManager.close();
-        return BeanMapperUtil.convertToEntityState(result);
+        try {
+            return BeanMapperUtil.convertToEntityState(result);
+        } catch (IOException e) {
+            throw new StateStoreException(e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
index 49e083c..29b3bbb 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
@@ -35,10 +35,11 @@ public interface DAGEngine {
      * Run an instance for execution.
      *
      * @param instance
+     * @param props
      * @return
      * @throws DAGEngineException
      */
-    String run(ExecutionInstance instance) throws DAGEngineException;
+    String run(ExecutionInstance instance, Properties props) throws DAGEngineException;
 
     /**
      * @param instance
@@ -85,9 +86,10 @@ public interface DAGEngine {
      * Perform dryrun of an instance.
      *
      * @param entity
+     * @param props
      * @throws DAGEngineException
      */
-    void submit(Entity entity) throws DAGEngineException;
+    void submit(Entity entity, Properties props) throws DAGEngineException;
 
     /**
      * Returns info about the Job.

http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index c6d4212..6dbec0c 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -68,6 +68,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
     public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun";
     public static final String FALCON_RERUN = "falcon.system.rerun";
+    public static final String FALCON_SKIP_DRYRUN = "falcon.system.skip.dryrun";
     public static final String FALCON_RESUME = "falcon.system.resume";
 
     private enum JobAction {
@@ -85,13 +86,24 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
     }
 
     @Override
-    public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) throws FalconException {
-        EXECUTION_SERVICE.schedule(entity);
+    public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException {
+        Properties props  = new Properties();
+        if (suppliedProps != null && !suppliedProps.isEmpty()) {
+            props.putAll(suppliedProps);
+        }
+        if (skipDryRun) {
+            props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true");
+        }
+        EXECUTION_SERVICE.schedule(entity, props);
     }
 
     @Override
     public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
-        DAGEngineFactory.getDAGEngine(clusterName).submit(entity);
+        Properties props  = new Properties();
+        if (skipDryRun) {
+            props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true");
+        }
+        DAGEngineFactory.getDAGEngine(clusterName).submit(entity, props);
     }
 
     @Override