You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/04/19 08:34:59 UTC
[2/2] falcon git commit: FALCON-1802 EL Expressions support in Native
Scheduler
FALCON-1802 EL Expressions support in Native Scheduler
Author: pavankumar526 <pa...@gmail.com>
Reviewers: "sandeepSamudrala <sa...@gmail.com>, Ajay Yadava <aj...@apache.org>"
Closes #57 from pavankumar526/FALCON-1802
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/7e4dc0d9
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/7e4dc0d9
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/7e4dc0d9
Branch: refs/heads/master
Commit: 7e4dc0d923b5ef3d44f7fa5c85681d6fa89e0caf
Parents: 62393fe
Author: pavankumar526 <pa...@gmail.com>
Authored: Tue Apr 19 12:03:57 2016 +0530
Committer: pavankumar526 <pa...@gmail.com>
Committed: Tue Apr 19 12:03:57 2016 +0530
----------------------------------------------------------------------
.../org/apache/falcon/entity/EntityUtil.java | 27 +++
.../falcon/expression/ExpressionHelper.java | 44 ++++
.../apache/falcon/persistence/EntityBean.java | 15 +-
.../org/apache/falcon/util/CalendarUnit.java | 39 ++++
.../java/org/apache/falcon/util/DateUtil.java | 175 +++++++++++++++
.../falcon/expression/ExpressionHelperTest.java | 24 ++
oozie/pom.xml | 6 +
.../apache/falcon/oozie/OozieEntityBuilder.java | 9 +
.../OozieOrchestrationWorkflowBuilder.java | 29 ++-
.../NativeOozieProcessWorkflowBuilder.java | 222 +++++++++++++++++++
.../execution/FalconExecutionService.java | 16 +-
.../execution/ProcessExecutionInstance.java | 109 ++++++---
.../falcon/execution/ProcessExecutor.java | 37 +++-
.../apache/falcon/execution/SchedulerUtil.java | 34 +++
.../service/impl/DataAvailabilityService.java | 9 +-
.../service/impl/SchedulerService.java | 6 +-
.../request/DataNotificationRequest.java | 17 ++
.../org/apache/falcon/predicate/Predicate.java | 1 +
.../org/apache/falcon/state/EntityState.java | 11 +
.../org/apache/falcon/state/InstanceState.java | 6 +-
.../org/apache/falcon/state/StateService.java | 19 +-
.../falcon/state/store/EntityStateStore.java | 2 +-
.../falcon/state/store/jdbc/BeanMapperUtil.java | 37 +++-
.../falcon/state/store/jdbc/JDBCStateStore.java | 27 ++-
.../falcon/workflow/engine/DAGEngine.java | 6 +-
.../workflow/engine/FalconWorkflowEngine.java | 18 +-
.../falcon/workflow/engine/OozieDAGEngine.java | 75 +++----
.../execution/FalconExecutionServiceTest.java | 7 +-
.../apache/falcon/execution/MockDAGEngine.java | 4 +-
scheduler/src/test/resources/runtime.properties | 25 +++
src/conf/runtime.properties | 11 +
.../apache/falcon/unit/FalconUnitTestBase.java | 2 +-
.../AbstractSchedulerManagerJerseyIT.java | 7 +-
.../InstanceSchedulerManagerJerseyIT.java | 35 ++-
.../resources/process-nolatedata-template.xml | 50 +++++
35 files changed, 1040 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 96befa1..8825a65 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -94,6 +94,15 @@ public final class EntityUtil {
public static final String WF_LIB_SEPARATOR = ",";
private static final String STAGING_DIR_NAME_SEPARATOR = "_";
+ public static final ThreadLocal<SimpleDateFormat> PATH_FORMAT = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ SimpleDateFormat format = new SimpleDateFormat("yyyyMMddHHmm");
+ format.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return format;
+ }
+ };
+
/** Priority with which the DAG will be scheduled.
* Matches the five priorities of Hadoop jobs.
*/
@@ -1082,4 +1091,22 @@ public final class EntityUtil {
}
return JOBPRIORITY.NORMAL;
}
+
+
+ /**
+ * Evaluates feedpath based on instance time.
+ * @param feedPath
+ * @param instanceTime
+ * @return
+ */
+ public static String evaluateDependentPath(String feedPath, Date instanceTime) {
+ String timestamp = PATH_FORMAT.get().format(instanceTime);
+ String instancePath = feedPath.replaceAll("\\$\\{YEAR\\}", timestamp.substring(0, 4));
+ instancePath = instancePath.replaceAll("\\$\\{MONTH\\}", timestamp.substring(4, 6));
+ instancePath = instancePath.replaceAll("\\$\\{DAY\\}", timestamp.substring(6, 8));
+ instancePath = instancePath.replaceAll("\\$\\{HOUR\\}", timestamp.substring(8, 10));
+ instancePath = instancePath.replaceAll("\\$\\{MINUTE\\}", timestamp.substring(10, 12));
+ return instancePath;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
index 65aaeba..451cbba 100644
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
@@ -21,6 +21,8 @@ package org.apache.falcon.expression;
import org.apache.commons.el.ExpressionEvaluatorImpl;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.util.CalendarUnit;
+import org.apache.falcon.util.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,6 +31,7 @@ import javax.servlet.jsp.el.ExpressionEvaluator;
import javax.servlet.jsp.el.FunctionMapper;
import javax.servlet.jsp.el.VariableResolver;
import java.lang.reflect.Method;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
@@ -53,6 +56,7 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
private static final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
private static final ExpressionHelper RESOLVER = ExpressionHelper.get();
+
public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
@@ -257,4 +261,44 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
return originalValue;
}
+ /**
+ * Converts date string to required format.
+ * @param dateTimeStr
+ * @param format
+ * @return
+ * @throws ParseException
+ */
+ public static String formatTime(String dateTimeStr, String format) throws ParseException {
+ Date dateTime = DateUtil.parseDateFalconTZ(dateTimeStr);
+ return DateUtil.formatDateCustom(dateTime, format);
+ }
+
+ /**
+ * Formats the instance and return.
+ * @return
+ */
+ public static String instanceTime() {
+ return DateUtil.formatDateFalconTZ(referenceDate.get());
+ }
+
+ /**
+ * EL function calculates date based on the following equation : newDate = baseDate + instance, * timeUnit.
+ * @param strBaseDate
+ * @param offset
+ * @param unit
+ * @return
+ * @throws Exception
+ */
+ public static String dateOffset(String strBaseDate, int offset, String unit) throws Exception {
+ Calendar baseCalDate = DateUtil.getCalendar(strBaseDate);
+ StringBuilder buffer = new StringBuilder();
+ baseCalDate.add(CalendarUnit.valueOf(unit).getCalendarUnit(), offset);
+ buffer.append(DateUtil.formatDateFalconTZ(baseCalDate));
+ return buffer.toString();
+ }
+
+ public static String user() {
+ return "${user.name}";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
index 274305c..f1c9cf3 100644
--- a/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/EntityBean.java
@@ -24,6 +24,7 @@ import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
+import javax.persistence.Lob;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
@@ -38,7 +39,7 @@ import java.util.List;
@NamedQueries({
@NamedQuery(name = PersistenceConstants.GET_ENTITY, query = "select OBJECT(a) from EntityBean a where a.id = :id"),
@NamedQuery(name = PersistenceConstants.GET_ENTITY_FOR_STATE, query = "select OBJECT(a) from EntityBean a where a.state = :state"),
- @NamedQuery(name = PersistenceConstants.UPDATE_ENTITY, query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type where a.id = :id"),
+ @NamedQuery(name = PersistenceConstants.UPDATE_ENTITY, query = "update EntityBean a set a.state = :state, a.name = :name, a.type = :type, a.properties = :properties where a.id = :id"),
@NamedQuery(name = PersistenceConstants.GET_ENTITIES_FOR_TYPE, query = "select OBJECT(a) from EntityBean a where a.type = :type"),
@NamedQuery(name = PersistenceConstants.GET_ENTITIES, query = "select OBJECT(a) from EntityBean a"),
@NamedQuery(name = PersistenceConstants.DELETE_ENTITY, query = "delete from EntityBean a where a.id = :id"),
@@ -68,6 +69,10 @@ public class EntityBean {
@Column(name = "current_state")
private String state;
+ @Column(name = "properties")
+ @Lob
+ private byte[] properties;
+
@OneToMany(cascade= CascadeType.REMOVE, mappedBy="entityBean")
private List<InstanceBean> instanceBeans;
@@ -113,5 +118,13 @@ public class EntityBean {
public void setInstanceBeans(List<InstanceBean> instanceBeans) {
this.instanceBeans = instanceBeans;
}
+
+ public byte[] getProperties() {
+ return properties;
+ }
+
+ public void setProperties(byte[] properties) {
+ this.properties = properties;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/util/CalendarUnit.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/CalendarUnit.java b/common/src/main/java/org/apache/falcon/util/CalendarUnit.java
new file mode 100644
index 0000000..f2a40b9
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/util/CalendarUnit.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import java.util.Calendar;
+
+/**
+ * TimeUnit used for Date operations.
+ */
+public enum CalendarUnit {
+ MINUTE(Calendar.MINUTE), HOUR(Calendar.HOUR), DAY(Calendar.DATE), MONTH(Calendar.MONTH),
+ YEAR(Calendar.YEAR), END_OF_DAY(Calendar.DATE), END_OF_MONTH(Calendar.MONTH), CRON(0), NONE(-1);
+
+ private int calendarUnit;
+
+ private CalendarUnit(int calendarUnit) {
+ this.calendarUnit = calendarUnit;
+ }
+
+ public int getCalendarUnit() {
+ return calendarUnit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java
index baf5b13..9e9b8e8 100644
--- a/common/src/main/java/org/apache/falcon/util/DateUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java
@@ -17,12 +17,20 @@
*/
package org.apache.falcon.util;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.Frequency;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.ParsePosition;
+import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
/**
* Helper to get date operations.
@@ -39,8 +47,37 @@ public final class DateUtil {
public static final long HOUR_IN_MILLIS = 60 * 60 * 1000;
+ private static final Pattern GMT_OFFSET_COLON_PATTERN = Pattern.compile("^GMT(\\-|\\+)(\\d{2})(\\d{2})$");
+
+ public static final TimeZone UTC = getTimeZone("UTC");
+
+ public static final String ISO8601_UTC_MASK = "yyyy-MM-dd'T'HH:mm'Z'";
+ private static String activeTimeMask = ISO8601_UTC_MASK;
+ private static TimeZone activeTimeZone = UTC;
+
+ private static final Pattern VALID_TIMEZONE_PATTERN = Pattern.compile("^UTC$|^GMT(\\+|\\-)\\d{4}$");
+
+ private static final String ISO8601_TZ_MASK_WITHOUT_OFFSET = "yyyy-MM-dd'T'HH:mm";
+ private static boolean entityInUTC = true;
+
private DateUtil() {}
+ /**
+ * Configures the Datetime parsing with process timezone.
+ *
+ */
+ public static void setTimeZone(String tz) throws FalconException {
+ if (StringUtils.isBlank(tz)) {
+ tz = "UTC";
+ }
+ if (!VALID_TIMEZONE_PATTERN.matcher(tz).matches()) {
+ throw new FalconException("Invalid entity timezone, it must be 'UTC' or 'GMT(+/-)####");
+ }
+ activeTimeZone = TimeZone.getTimeZone(tz);
+ entityInUTC = activeTimeZone.equals(UTC);
+ activeTimeMask = (entityInUTC) ? ISO8601_UTC_MASK : ISO8601_TZ_MASK_WITHOUT_OFFSET + tz.substring(3);
+ }
+
public static Date getNextMinute(Date time) throws Exception {
Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
insCal.setTime(time);
@@ -99,4 +136,142 @@ public final class DateUtil {
public static Date offsetTime(Date date, int seconds) {
return new Date(1000L * seconds + date.getTime());
}
+
+ /**
+ * Parses a datetime in ISO8601 format in the process timezone.
+ *
+ * @param s string with the datetime to parse.
+ * @return the corresponding {@link java.util.Date} instance for the parsed date.
+ * @throws java.text.ParseException thrown if the given string was
+ * not an ISO8601 value for the process timezone.
+ */
+ public static Date parseDateFalconTZ(String s) throws ParseException {
+ s = s.trim();
+ ParsePosition pos = new ParsePosition(0);
+ Date d = getISO8601DateFormat(activeTimeZone, activeTimeMask).parse(s, pos);
+ if (d == null) {
+ throw new ParseException("Could not parse [" + s + "] using [" + activeTimeMask + "] mask",
+ pos.getErrorIndex());
+ }
+ if (s.length() > pos.getIndex()) {
+ throw new ParseException("Correct datetime string is followed by invalid characters: " + s, pos.getIndex());
+ }
+ return d;
+ }
+
+ private static DateFormat getISO8601DateFormat(TimeZone tz, String mask) {
+ DateFormat dateFormat = new SimpleDateFormat(mask);
+ // Stricter parsing to prevent dates such as 2011-12-50T01:00Z (December 50th) from matching
+ dateFormat.setLenient(false);
+ dateFormat.setTimeZone(tz);
+ return dateFormat;
+ }
+
+ private static DateFormat getSpecificDateFormat(String format) {
+ DateFormat dateFormat = new SimpleDateFormat(format);
+ dateFormat.setTimeZone(activeTimeZone);
+ return dateFormat;
+ }
+
+ /**
+ * Formats a {@link java.util.Date} as a string using the specified format mask.
+ * <p/>
+ * The format mask must be a {@link java.text.SimpleDateFormat} valid format mask.
+ *
+ * @param d {@link java.util.Date} to format.
+ * @return the string for the given date using the specified format mask,
+ * <code>NULL</code> if the {@link java.util.Date} instance was <code>NULL</code>
+ */
+ public static String formatDateCustom(Date d, String format) {
+ return (d != null) ? getSpecificDateFormat(format).format(d) : "NULL";
+ }
+
+ /**
+ * Formats a {@link java.util.Date} as a string in ISO8601 format using process timezone.
+ *
+ * @param d {@link java.util.Date} to format.
+ * @return the ISO8601 string for the given date, <code>NULL</code> if the {@link java.util.Date} instance was
+ * <code>NULL</code>
+ */
+ public static String formatDateFalconTZ(Date d) {
+ return (d != null) ? getISO8601DateFormat(activeTimeZone, activeTimeMask).format(d) : "NULL";
+ }
+
+ /**
+ * Returns the {@link java.util.TimeZone} for the given timezone ID.
+ *
+ * @param tzId timezone ID.
+ * @return the {@link java.util.TimeZone} for the given timezone ID.
+ */
+ public static TimeZone getTimeZone(String tzId) {
+ if (tzId == null) {
+ throw new IllegalArgumentException("Timezone cannot be null");
+ }
+ tzId = handleGMTOffsetTZNames(tzId); // account for GMT-####
+ TimeZone tz = TimeZone.getTimeZone(tzId);
+ // If these are not equal, it means that the tzId is not valid (invalid tzId's return GMT)
+ if (!tz.getID().equals(tzId)) {
+ throw new IllegalArgumentException("Invalid TimeZone: " + tzId);
+ }
+ return tz;
+ }
+
+ /**
+ * {@link java.util.TimeZone#getTimeZone(String)} takes the timezone ID as an argument; for invalid IDs
+ * it returns the <code>GMT</code> TimeZone. A timezone ID formatted like <code>GMT-####</code> is not a valid ID,
+ * however, it will actually map this to the <code>GMT-##:##</code> TimeZone, instead of returning the
+ * <code>GMT</code> TimeZone. We check (later) check that a timezone ID is valid by calling
+ * {@link java.util.TimeZone#getTimeZone(String)} and seeing if the returned
+ * TimeZone ID is equal to the original; because we want to allow <code>GMT-####</code>, while still
+ * disallowing actual invalid IDs, we have to manually replace <code>GMT-####</code>
+ * with <code>GMT-##:##</code> first.
+ *
+ * @param tzId The timezone ID
+ * @return If tzId matches <code>GMT-####</code>, then we return <code>GMT-##:##</code>; otherwise,
+ * we return tzId unaltered
+ */
+ private static String handleGMTOffsetTZNames(String tzId) {
+ Matcher m = GMT_OFFSET_COLON_PATTERN.matcher(tzId);
+ if (m.matches() && m.groupCount() == 3) {
+ tzId = "GMT" + m.group(1) + m.group(2) + ":" + m.group(3);
+ }
+ return tzId;
+ }
+
+ /**
+ * Create a Calendar instance for UTC time zone using the specified date.
+ * @param dateString
+ * @return appropriate Calendar object
+ * @throws Exception
+ */
+ public static Calendar getCalendar(String dateString) throws Exception {
+ return getCalendar(dateString, activeTimeZone);
+ }
+
+ /**
+ * Create a Calendar instance using the specified date and Time zone.
+ * @param dateString
+ * @param tz : TimeZone
+ * @return appropriate Calendar object
+ * @throws Exception
+ */
+ public static Calendar getCalendar(String dateString, TimeZone tz) throws Exception {
+ Date date = DateUtil.parseDateFalconTZ(dateString);
+ Calendar calDate = Calendar.getInstance();
+ calDate.setTime(date);
+ calDate.setTimeZone(tz);
+ return calDate;
+ }
+
+ /**
+ * Formats a {@link java.util.Calendar} as a string in ISO8601 format process timezone.
+ *
+ * @param c {@link java.util.Calendar} to format.
+ * @return the ISO8601 string for the given date, <code>NULL</code> if the {@link java.util.Calendar} instance was
+ * <code>NULL</code>
+ */
+ public static String formatDateFalconTZ(Calendar c) {
+ return (c != null) ? formatDateFalconTZ(c.getTime()) : "NULL";
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
index da5dbca..b3895c3 100644
--- a/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
+++ b/common/src/test/java/org/apache/falcon/expression/ExpressionHelperTest.java
@@ -81,4 +81,28 @@ public class ExpressionHelperTest {
{"future(1,0)", "2015-02-01T00:00Z"},
};
}
+
+ @Test
+ public void testFormatTime() throws FalconException {
+ String output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy\")",
+ String.class);
+ Assert.assertEquals(output, "2016");
+ output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM\")",
+ String.class);
+ Assert.assertEquals(output, "2016-02");
+ output = expressionHelper.evaluate("formatTime(\"2016-02-01T10:59Z\", \"yyyy-MM-dd\")",
+ String.class);
+ Assert.assertEquals(output, "2016-02-01");
+ }
+
+
+ @Test
+ public void testOffsetAndInstanceTime() throws FalconException {
+ String date = expressionHelper.evaluate("dateOffset(instanceTime(), 1, 'DAY')", String.class);
+ Assert.assertEquals(date, "2015-02-02T00:00Z");
+ date = expressionHelper.evaluate("dateOffset(instanceTime(), 3, 'HOUR')", String.class);
+ Assert.assertEquals(date, "2015-02-01T03:00Z");
+ date = expressionHelper.evaluate("dateOffset(instanceTime(), -25, 'MINUTE')", String.class);
+ Assert.assertEquals(date, "2015-01-31T23:35Z");
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index 04b3df6..c14c625 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -103,6 +103,12 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>${joda.version}</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index a36ee79..a856f8a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -109,6 +109,14 @@ public abstract class OozieEntityBuilder<T extends Entity> {
public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException;
public Properties build(Cluster cluster, Path buildPath, Map<String, String> properties) throws FalconException {
+ Properties props = new Properties();
+ if (properties != null) {
+ props.putAll(properties);
+ }
+ return build(cluster, buildPath, props);
+ }
+
+ public Properties build(Cluster cluster, Path buildPath, Properties properties) throws FalconException {
Properties builderProperties = build(cluster, buildPath);
if (properties == null || properties.isEmpty()) {
return builderProperties;
@@ -130,6 +138,7 @@ public abstract class OozieEntityBuilder<T extends Entity> {
return propertiesCopy;
}
+
protected String getStoragePath(Path path) {
if (path != null) {
return getStoragePath(path.toString());
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 181f2d2..562627e 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -37,6 +37,7 @@ import org.apache.falcon.oozie.feed.FSReplicationWorkflowBuilder;
import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder;
import org.apache.falcon.oozie.feed.HCatReplicationWorkflowBuilder;
import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
+import org.apache.falcon.oozie.process.NativeOozieProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
@@ -58,6 +59,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.joda.time.DateTime;
import javax.xml.bind.JAXBElement;
import javax.xml.namespace.QName;
@@ -93,11 +95,19 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
private LifeCycle lifecycle;
+ private DateTime nominalTime;
protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
protected static final String MR_QUEUE_NAME = "queueName";
protected static final String MR_JOB_PRIORITY = "jobPriority";
+ /**
+ * Represents Scheduler for Entities.
+ */
+ public enum Scheduler {
+ OOZIE, NATIVE
+ }
+
public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) {
super(entity);
this.lifecycle = lifecycle;
@@ -115,7 +125,13 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
super(entity);
}
- public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
+ public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster,
+ Tag lifecycle) throws FalconException {
+ return get(entity, cluster, lifecycle, Scheduler.OOZIE);
+ }
+
+ public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle,
+ Scheduler scheduler)
throws FalconException {
switch (entity.getEntityType()) {
case FEED:
@@ -166,6 +182,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
return new PigProcessWorkflowBuilder(process);
case OOZIE:
+ if (Scheduler.NATIVE == scheduler) {
+ return new NativeOozieProcessWorkflowBuilder(process);
+ }
return new OozieProcessWorkflowBuilder(process);
case HIVE:
@@ -497,4 +516,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
}
return conf;
}
+
+ public void setNominalTime(DateTime nominalTime) {
+ this.nominalTime = nominalTime;
+ }
+
+ public DateTime getNominalTime() {
+ return nominalTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
new file mode 100644
index 0000000..78e049d
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/NativeOozieProcessWorkflowBuilder.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.util.DateUtil;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.hadoop.fs.Path;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Workflow Builder for oozie process in case of Native Scheduler.
+ */
+public class NativeOozieProcessWorkflowBuilder extends OozieProcessWorkflowBuilder {
+
+ private static final ExpressionHelper EXPRESSION_HELPER = ExpressionHelper.get();
+ private static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm";
+
+ public NativeOozieProcessWorkflowBuilder(org.apache.falcon.entity.v0.process.Process entity) {
+ super(entity);
+ }
+
+ @Override
+ public java.util.Properties build(Cluster cluster,
+ Path buildPath, Properties suppliedProps) throws FalconException {
+ Properties elProps = new Properties();
+ DateTimeFormatter fmt = DateTimeFormat.forPattern(INSTANCE_FORMAT);
+ elProps.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), fmt.print(getNominalTime()));
+ elProps.put(WorkflowExecutionArgs.TIMESTAMP.getName(), fmt.print(getNominalTime()));
+ elProps.put(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED.getName(), "true");
+ elProps.put(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED.getName(), "false"); //check true or false
+
+
+ DateUtil.setTimeZone(entity.getTimezone().getID());
+ ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis()));
+ elProps.putAll(getInputProps(cluster));
+ elProps.putAll(getOutputProps());
+ elProps.putAll(evalProperties());
+ Properties buildProps = build(cluster, buildPath);
+ buildProps.putAll(elProps);
+ copyPropsWithoutOverride(buildProps, suppliedProps);
+ return buildProps;
+ }
+
+ private void copyPropsWithoutOverride(Properties buildProps, Properties suppliedProps) {
+ if (suppliedProps == null || suppliedProps.isEmpty()) {
+ return;
+ }
+ for (String propertyName : suppliedProps.stringPropertyNames()) {
+ if (buildProps.containsKey(propertyName)) {
+ LOG.warn("User provided property {} is already declared in the entity and will be ignored.",
+ propertyName);
+ continue;
+ }
+ String propertyValue = suppliedProps.getProperty(propertyName);
+ buildProps.put(propertyName, propertyValue);
+ }
+ }
+
+ private Properties evalProperties() throws FalconException {
+ Properties props = new Properties();
+ org.apache.falcon.entity.v0.process.Properties processProps = entity.getProperties();
+ for (Property property : processProps.getProperties()) {
+ String propName = property.getName();
+ String propValue = property.getValue();
+ String evalExp = EXPRESSION_HELPER.evaluateFullExpression(propValue, String.class);
+ props.put(propName, evalExp);
+ }
+ return props;
+ }
+
+ private Properties getOutputProps() throws FalconException {
+ Properties props = new Properties();
+ if (entity.getOutputs() == null) {
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), NONE);
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), NONE);
+ return props;
+ }
+ List<String> feedNames = new ArrayList<>();
+ List<String> feedInstancePaths= new ArrayList<>();
+ for (Output output : entity.getOutputs().getOutputs()) {
+ Feed feed = ConfigurationStore.get().get(EntityType.FEED, output.getFeed());
+ feedNames.add(feed.getName());
+ String outputExp = output.getInstance();
+ Date outTime = EXPRESSION_HELPER.evaluate(outputExp, Date.class);
+ for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+ org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+ EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+ if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
+ continue;
+ }
+
+ List<Location> locations = FeedHelper.getLocations(cluster, feed);
+ for (Location loc : locations) {
+ String path = EntityUtil.evaluateDependentPath(loc.getPath(), outTime);
+ path = getStoragePath(path);
+ if (loc.getType() != LocationType.DATA) {
+ props.put(output.getName() + "." + loc.getType().toString().toLowerCase(), path);
+ } else {
+ props.put(output.getName(), path);
+ }
+ feedInstancePaths.add(path);
+ }
+ }
+ }
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), StringUtils.join(feedNames, ","));
+ props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), StringUtils.join(feedInstancePaths, ","));
+ return props;
+ }
+
+ private Properties getInputProps(Cluster clusterObj) throws FalconException {
+ Properties props = new Properties();
+
+ if (entity.getInputs() == null) {
+ props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), NONE);
+ props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), NONE);
+ props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), NONE);
+ props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), NONE);
+ return props;
+ }
+ List<String> falconInputFeeds = new ArrayList<>();
+ List<String> falconInputNames = new ArrayList<>();
+ List<String> falconInputPaths = new ArrayList<>();
+ List<String> falconInputFeedStorageTypes = new ArrayList<>();
+ for (Input input : entity.getInputs().getInputs()) {
+ Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
+ Storage storage = FeedHelper.createStorage(clusterObj, feed);
+ if (storage.getType() != Storage.TYPE.FILESYSTEM) {
+ throw new UnsupportedOperationException("Storage Type not supported " + storage.getType());
+ }
+ falconInputFeeds.add(feed.getName());
+ falconInputNames.add(input.getName());
+ falconInputFeedStorageTypes.add(storage.getType().name());
+ String partition = input.getPartition();
+
+ String startTimeExp = input.getStart();
+ String endTimeExp = input.getEnd();
+ ExpressionHelper.setReferenceDate(new Date(getNominalTime().getMillis()));
+ Date startTime = EXPRESSION_HELPER.evaluate(startTimeExp, Date.class);
+ Date endTime = EXPRESSION_HELPER.evaluate(endTimeExp, Date.class);
+
+ for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+ org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+ EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+ if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
+ continue;
+ }
+
+ List<Location> locations = FeedHelper.getLocations(cluster, feed);
+ for (Location loc : locations) {
+ if (loc.getType() != LocationType.DATA) {
+ continue;
+ }
+ List<String> paths = new ArrayList<>();
+ List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
+ startTime, endTime); // test when startTime and endTime are equal.
+ for (Date instanceTime : instanceTimes) {
+ String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime);
+ if (StringUtils.isNotBlank(partition)) {
+ if (!path.endsWith("/") && !partition.startsWith("/")) {
+ path = path + "/";
+ }
+ path = path + partition;
+ }
+ path = getStoragePath(path);
+ paths.add(path);
+ }
+ if (loc.getType() != LocationType.DATA) {
+ props.put(input.getName() + "." + loc.getType().toString().toLowerCase(),
+ StringUtils.join(paths, ","));
+ } else {
+ props.put(input.getName(), StringUtils.join(paths, ","));
+ }
+ falconInputPaths.add(StringUtils.join(paths, ","));
+ }
+ }
+ }
+ props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), StringUtils.join(falconInputFeeds, "#"));
+ props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), StringUtils.join(falconInputNames, "#"));
+ props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), StringUtils.join(falconInputPaths, "#"));
+ props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(),
+ StringUtils.join(falconInputFeedStorageTypes, "#"));
+ return props;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
index 93c894d..a969c7a 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/FalconExecutionService.java
@@ -35,6 +35,7 @@ import org.apache.falcon.state.store.AbstractStateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -203,10 +204,11 @@ public final class FalconExecutionService implements FalconService, EntityStateC
* Schedules an entity.
*
* @param entity
+ * @param properties
* @throws FalconException
*/
- public void schedule(Entity entity) throws FalconException {
- StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this);
+ public void schedule(Entity entity, Properties properties) throws FalconException {
+ StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this, properties);
}
/**
@@ -256,4 +258,14 @@ public final class FalconExecutionService implements FalconService, EntityStateC
throw new FalconException("Entity executor for entity cluster key : " + id.getKey() + " does not exist.");
}
}
+
+ /**
+ * Schedules an entity.
+ *
+ * @param entity
+ * @throws FalconException
+ */
+ public void schedule(Process entity) throws FalconException {
+ schedule(entity, null);
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
index 2d666c3..49e1120 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutionInstance.java
@@ -30,6 +30,7 @@ import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.notification.service.event.DataEvent;
import org.apache.falcon.notification.service.event.Event;
@@ -48,7 +49,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -61,12 +64,14 @@ import java.util.Properties;
public class ProcessExecutionInstance extends ExecutionInstance {
private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
private final Process process;
- private List<Predicate> awaitedPredicates = new ArrayList<>();
+ private List<Predicate> awaitedPredicates = Collections.synchronizedList(new ArrayList<Predicate>());
private DAGEngine dagEngine = null;
- private boolean hasTimedOut = false;
+ protected boolean hasTimedOut = false;
private InstanceID id;
private int instanceSequence;
+ private boolean areDataPredicatesEmpty;
private final FalconExecutionService executionService = FalconExecutionService.get();
+ private final ExpressionHelper expressionHelper = ExpressionHelper.get();
/**
* Constructor.
@@ -83,7 +88,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
this.id = new InstanceID(process, cluster, getInstanceTime());
computeInstanceSequence();
dagEngine = DAGEngineFactory.getDAGEngine(cluster);
- registerForNotifications(false);
+ areDataPredicatesEmpty = true;
}
/**
@@ -112,7 +117,7 @@ public class ProcessExecutionInstance extends ExecutionInstance {
// Currently, registers for only data notifications to ensure gating conditions are met.
// Can be extended to register for other notifications.
- private void registerForNotifications(boolean isResume) throws FalconException {
+ public void registerForNotifications(boolean isResume) throws FalconException {
if (process.getInputs() == null) {
return;
}
@@ -122,14 +127,40 @@ public class ProcessExecutionInstance extends ExecutionInstance {
continue;
}
Feed feed = ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
- List<Path> paths = new ArrayList<>();
+ String startTimeExp = input.getStart();
+ String endTimeExp = input.getEnd();
+ DateTime processInstanceTime = getInstanceTime();
+ expressionHelper.setReferenceDate(new Date(processInstanceTime.getMillis()));
+
+ Date startTime = expressionHelper.evaluate(startTimeExp, Date.class);
+ Date endTime = expressionHelper.evaluate(endTimeExp, Date.class);
+
for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+ org.apache.falcon.entity.v0.cluster.Cluster clusterEntity =
+ EntityUtil.getEntity(EntityType.CLUSTER, cluster.getName());
+ if (!EntityUtil.responsibleFor(clusterEntity.getColo())) {
+ continue;
+ }
+ List<Path> paths = new ArrayList<>();
List<Location> locations = FeedHelper.getLocations(cluster, feed);
for (Location loc : locations) {
if (loc.getType() != LocationType.DATA) {
continue;
}
- paths.add(new Path(loc.getPath()));
+ List<Date> instanceTimes = EntityUtil.getEntityInstanceTimes(feed, cluster.getName(),
+ startTime, endTime);
+ for (Date instanceTime : instanceTimes) {
+ String path = EntityUtil.evaluateDependentPath(loc.getPath(), instanceTime);
+ if (feed.getAvailabilityFlag() != null && !feed.getAvailabilityFlag().isEmpty()) {
+ if (!path.endsWith("/")) {
+ path = path + "/";
+ }
+ path = path + feed.getAvailabilityFlag();
+ }
+ if (!paths.contains(new Path(path))) {
+ paths.add(new Path(path));
+ }
+ }
}
Predicate predicate = Predicate.createDataPredicate(paths);
@@ -137,21 +168,19 @@ public class ProcessExecutionInstance extends ExecutionInstance {
if (isResume && !awaitedPredicates.contains(predicate)) {
continue;
}
- // TODO : Revisit this once the Data Notification Service has been built
- // TODO Very IMP : Need to change the polling frequency
+ addDataPredicate(predicate);
DataAvailabilityService.DataRequestBuilder requestBuilder =
(DataAvailabilityService.DataRequestBuilder)
NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA)
.createRequestBuilder(executionService, getId());
requestBuilder.setLocations(paths)
.setCluster(cluster.getName())
- .setPollingFrequencyInMillis(100)
+ .setPollingFrequencyInMillis(SchedulerUtil.getPollingFrequencyinMillis(process.getFrequency()))
.setTimeoutInMillis(getTimeOutInMillis())
.setLocations(paths);
NotificationServicesRegistry.register(requestBuilder.build());
- LOG.info("Registered for a data notification for process {} for data location {}",
- process.getName(), StringUtils.join(paths, ","));
- awaitedPredicates.add(predicate);
+ LOG.info("Registered for a data notification for process {} of instance time {} for data location {}",
+ process.getName(), getInstanceTime(), StringUtils.join(paths, ","));
}
}
}
@@ -170,22 +199,26 @@ public class ProcessExecutionInstance extends ExecutionInstance {
case DATA_AVAILABLE:
// Data has not become available and the wait time has passed
if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
- if (getTimeOutInMillis() <= (System.currentTimeMillis() - getCreationTime().getMillis())) {
- hasTimedOut = true;
- }
- } else {
- // If the event matches any of the awaited predicates, remove the predicate of the awaited list
- Predicate toRemove = null;
- for (Predicate predicate : awaitedPredicates) {
- if (predicate.evaluate(Predicate.getPredicate(event))) {
- toRemove = predicate;
- break;
- }
- }
- if (toRemove != null) {
- awaitedPredicates.remove(toRemove);
+ hasTimedOut = true;
+ }
+ // If the event matches any of the awaited predicates, remove the predicate of the awaited list
+ Predicate toRemove = null;
+ synchronized (awaitedPredicates) {
+ Iterator<Predicate> iterator = awaitedPredicates.iterator();
+ while (iterator.hasNext()) {
+ Predicate predicate = iterator.next();
+ if (predicate.evaluate(Predicate.getPredicate(event))) {
+ toRemove = predicate;
+ break;
}
}
+ if (toRemove != null) {
+ awaitedPredicates.remove(toRemove);
+ }
+ if (awaitedPredicates.size() == 0) {
+ areDataPredicatesEmpty = true;
+ }
+ }
break;
default:
}
@@ -200,13 +233,16 @@ public class ProcessExecutionInstance extends ExecutionInstance {
if (awaitedPredicates.isEmpty()) {
return true;
} else {
- // If it is waiting to be scheduled, it is in ready.
- for (Predicate predicate : awaitedPredicates) {
- if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) {
- return false;
+ synchronized (awaitedPredicates) {
+ Iterator<Predicate> iterator = awaitedPredicates.iterator();
+ while (iterator.hasNext()) {
+ Predicate predicate = iterator.next();
+ if (!predicate.getType().equals(Predicate.TYPE.JOB_COMPLETION)) {
+ return false;
+ }
}
+ return true;
}
- return true;
}
}
@@ -338,4 +374,15 @@ public class ProcessExecutionInstance extends ExecutionInstance {
public void rerun() throws FalconException {
registerForNotifications(false);
}
+
+ public boolean areDataAwaitingPredicatesEmpty() {
+ return areDataPredicatesEmpty;
+ }
+
+ protected synchronized void addDataPredicate(Predicate predicate) {
+ synchronized (awaitedPredicates) {
+ awaitedPredicates.add(predicate);
+ areDataPredicatesEmpty = false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
index 0fc68f0..fec5f31 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/ProcessExecutor.java
@@ -33,6 +33,7 @@ import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.exception.InvalidStateTransitionException;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.notification.service.NotificationServicesRegistry;
+import org.apache.falcon.notification.service.event.DataEvent;
import org.apache.falcon.notification.service.event.Event;
import org.apache.falcon.notification.service.event.EventType;
import org.apache.falcon.notification.service.event.JobCompletedEvent;
@@ -93,8 +94,9 @@ public class ProcessExecutor extends EntityExecutor {
initInstances();
}
// Check to handle restart and restoration from state store.
- if (STATE_STORE.getEntity(id.getEntityID()).getCurrentState() != EntityState.STATE.SCHEDULED) {
- dryRun();
+ EntityState entityState = STATE_STORE.getEntity(id.getEntityID());
+ if (entityState.getCurrentState() != EntityState.STATE.SCHEDULED) {
+ dryRun(entityState.getProperties());
} else {
LOG.info("Process, {} was already scheduled on cluster, {}.", process.getName(), cluster);
LOG.info("Loading instances for process {} from state store.", process.getName());
@@ -103,8 +105,8 @@ public class ProcessExecutor extends EntityExecutor {
registerForNotifications(getLastInstanceTime());
}
- private void dryRun() throws FalconException {
- DAGEngineFactory.getDAGEngine(cluster).submit(process);
+ private void dryRun(Properties properties) throws FalconException {
+ DAGEngineFactory.getDAGEngine(cluster).submit(process, properties);
}
// Initializes the cache of execution instances. Cache is backed by the state store.
@@ -419,6 +421,30 @@ public class ProcessExecutor extends EntityExecutor {
stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
}
break;
+ case DATA_AVAILABLE:
+ instance = instances.get((InstanceID)event.getTarget());
+ instance.onEvent(event);
+ switch (((DataEvent) event).getStatus()) {
+ case AVAILABLE:
+ if (instance.areDataAwaitingPredicatesEmpty() && !instance.hasTimedOut) {
+ LOG.info("Data conditions met for instance {} and scheduled for running ", instance.getId());
+ stateService.handleStateChange(instance, InstanceState.EVENT.CONDITIONS_MET, this);
+ } else if (instance.areDataAwaitingPredicatesEmpty()) {
+ LOG.info("Instance {} timedout since input data not available", instance.getId());
+ stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+ } else {
+ STATE_STORE.updateExecutionInstance(new InstanceState(instance));
+ }
+ break;
+ case UNAVAILABLE:
+ if (instance.areDataAwaitingPredicatesEmpty()) {
+ stateService.handleStateChange(instance, InstanceState.EVENT.TIME_OUT, this);
+ }
+ break;
+ default:
+ throw new InvalidStateTransitionException("Invalid Data event status.");
+ }
+ break;
default:
if (isTriggerEvent(event)) {
instance = buildInstance(event);
@@ -473,7 +499,8 @@ public class ProcessExecutor extends EntityExecutor {
return event.getTarget().equals(id)
|| event.getType() == EventType.JOB_COMPLETED
|| event.getType() == EventType.JOB_SCHEDULED
- || event.getType() == EventType.RE_RUN;
+ || event.getType() == EventType.RE_RUN
+ || event.getType() == EventType.DATA_AVAILABLE;
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
index 3e7fc9b..236da11 100644
--- a/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/execution/SchedulerUtil.java
@@ -18,6 +18,7 @@
package org.apache.falcon.execution;
import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.util.RuntimeProperties;
import org.joda.time.DateTime;
/**
@@ -27,6 +28,14 @@ public final class SchedulerUtil {
private static final long MINUTE_IN_MS = 60 * 1000L;
private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
+ public static final String MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+ "falcon.scheduler.minutely.process.polling.frequency.millis";
+ public static final String HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+ "falcon.scheduler.hourly.process.polling.frequency.millis";
+ public static final String DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+ "falcon.scheduler.daily.process.polling.frequency.millis";
+ public static final String MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS =
+ "falcon.scheduler.monthly.process.polling.frequency.millis";
private SchedulerUtil(){};
@@ -51,4 +60,29 @@ public final class SchedulerUtil {
throw new IllegalArgumentException("Invalid time unit " + frequency.getTimeUnit().name());
}
}
+
+ /**
+ *
+ * @param frequency
+ * @return
+ */
+ public static long getPollingFrequencyinMillis(Frequency frequency) {
+ switch (frequency.getTimeUnit()) {
+ case minutes:
+ return Long.parseLong(RuntimeProperties.get().getProperty(MINUTELY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+ "20000"));
+ case hours:
+ return Long.parseLong(RuntimeProperties.get().getProperty(HOURLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+ "60000"));
+ case days:
+ return Long.parseLong(RuntimeProperties.get().getProperty(DAILY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+ "120000"));
+ case months:
+ return Long.parseLong(RuntimeProperties.get().getProperty(MONTHLY_PROCESS_FREQUENCY_POLLING_IN_MILLIS,
+ "180000"));
+ default:
+ throw new IllegalArgumentException("Unhandled frequency time unit " + frequency.getTimeUnit());
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
index 732da62..1240be9 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/DataAvailabilityService.java
@@ -67,6 +67,9 @@ public class DataAvailabilityService implements FalconNotificationService {
public void register(NotificationRequest request) throws NotificationServiceException {
LOG.info("Registering Data notification for " + request.getCallbackId().toString());
DataNotificationRequest dataNotificationRequest = (DataNotificationRequest) request;
+ if (instancesToIgnore.containsKey(dataNotificationRequest.getCallbackId())) {
+ instancesToIgnore.remove(dataNotificationRequest.getCallbackId());
+ }
delayQueue.offer(dataNotificationRequest);
}
@@ -246,7 +249,11 @@ public class DataAvailabilityService implements FalconNotificationService {
Map<Path, Boolean> locations) throws IOException {
for (Path path : unAvailablePaths) {
if (fs.exists(path)) {
- locations.put(path, true);
+ if (locations.containsKey(path)) {
+ locations.put(path, true);
+ } else {
+ locations.put(new Path(path.toUri().getPath()), true);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
index 401c57e..a110e64 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/impl/SchedulerService.java
@@ -43,6 +43,8 @@ import org.apache.falcon.notification.service.request.JobScheduleNotificationReq
import org.apache.falcon.notification.service.request.NotificationRequest;
import org.apache.falcon.predicate.Predicate;
import org.apache.falcon.state.EntityClusterID;
+import org.apache.falcon.state.EntityID;
+import org.apache.falcon.state.EntityState;
import org.apache.falcon.state.ID;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.InstanceState;
@@ -304,7 +306,9 @@ public class SchedulerService implements FalconNotificationService, Notification
DAGEngineFactory.getDAGEngine(instance.getCluster()).reRun(instance, props, isForced);
}
} else {
- externalId = DAGEngineFactory.getDAGEngine(instance.getCluster()).run(instance);
+ EntityState entityState = STATE_STORE.getEntity(new EntityID(instance.getEntity()));
+ externalId = DAGEngineFactory.getDAGEngine(instance.getCluster())
+ .run(instance, entityState.getProperties());
}
LOG.info("Scheduled job {} for instance {}", externalId, instance.getId());
JobScheduledEvent event = new JobScheduledEvent(instance.getId(),
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
index c7dd5d3..9e2b993 100644
--- a/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
+++ b/scheduler/src/main/java/org/apache/falcon/notification/service/request/DataNotificationRequest.java
@@ -151,6 +151,15 @@ public class DataNotificationRequest extends NotificationRequest implements Dela
if (!locations.equals(that.locations)) {
return false;
}
+ if (pollingFrequencyInMillis != (that.pollingFrequencyInMillis)) {
+ return false;
+ }
+ if (timeoutInMillis != that.timeoutInMillis) {
+ return false;
+ }
+ if (createdTimeInMillis != that.createdTimeInMillis) {
+ return false;
+ }
return true;
}
@@ -158,8 +167,16 @@ public class DataNotificationRequest extends NotificationRequest implements Dela
public int hashCode() {
int result = cluster.hashCode();
result = 31 * result + (locations != null ? locations.hashCode() : 0);
+ result = 31 * result + Long.valueOf(pollingFrequencyInMillis).hashCode();
+ result = 31 * result + Long.valueOf(timeoutInMillis).hashCode();
+ result = 31 * result + Long.valueOf(createdTimeInMillis).hashCode();
return result;
}
+ @Override
+ public String toString() {
+ return "cluster: " + this.getCluster() + " locations: " + this.locations + " createdTime: "
+ + this.createdTimeInMillis;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
index c248db6..93dcb12 100644
--- a/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
+++ b/scheduler/src/main/java/org/apache/falcon/predicate/Predicate.java
@@ -163,6 +163,7 @@ public class Predicate implements Serializable {
* @return
*/
public static Predicate createDataPredicate(List<Path> paths) {
+ Collections.sort(paths);
return new Predicate(TYPE.DATA)
.addClause("path", StringUtils.join(paths, ","));
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
index 38479a4..1b26c7a 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/EntityState.java
@@ -20,6 +20,8 @@ package org.apache.falcon.state;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.exception.InvalidStateTransitionException;
+import java.util.Properties;
+
/**
* Represents the state of a schedulable entity.
* Implements {@link org.apache.falcon.state.StateMachine} for an entity.
@@ -27,8 +29,17 @@ import org.apache.falcon.exception.InvalidStateTransitionException;
public class EntityState implements StateMachine<EntityState.STATE, EntityState.EVENT> {
private Entity entity;
private STATE currentState;
+ private Properties properties;
private static final STATE INITIAL_STATE = STATE.SUBMITTED;
+ public Properties getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Properties properties) {
+ this.properties = properties;
+ }
+
/**
* Enumerates all the valid states of a schedulable entity and the valid transitions from that state.
*/
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
index b862e4d..f5e6f5b 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/InstanceState.java
@@ -270,7 +270,11 @@ public class InstanceState implements StateMachine<InstanceState.STATE, Instance
@Override
public String toString() {
- return instance.getId().toString() + "STATE: " + currentState.toString();
+ StringBuilder output = new StringBuilder();
+ if (instance.getId() != null) {
+ output.append(instance.getId());
+ }
+ return output.append("STATE").append(currentState.toString()).toString();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/StateService.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/StateService.java b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
index 638bb6e..8a66fb0 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/StateService.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/StateService.java
@@ -21,11 +21,14 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.execution.ExecutionInstance;
+import org.apache.falcon.execution.ProcessExecutionInstance;
import org.apache.falcon.state.store.AbstractStateStore;
import org.apache.falcon.state.store.StateStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Properties;
+
/**
* A service that fetches state from state store, handles state transitions of entities and instances,
* invokes state change handler and finally persists the new state in the state store.
@@ -62,14 +65,18 @@ public final class StateService {
* @param handler
* @throws FalconException
*/
- public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
- throws FalconException {
+ public void handleStateChange(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler,
+ Properties props) throws FalconException {
EntityID id = new EntityID(entity);
if (!stateStore.entityExists(id)) {
// New entity
if (event == EntityState.EVENT.SUBMIT) {
callbackHandler(entity, EntityState.EVENT.SUBMIT, handler);
- stateStore.putEntity(new EntityState(entity));
+ EntityState entityState = new EntityState(entity);
+ if (props != null && !props.isEmpty()) {
+ entityState.setProperties(props);
+ }
+ stateStore.putEntity(entityState);
LOG.debug("Entity {} submitted due to event {}.", id, event.name());
} else {
throw new FalconException("Entity " + id + " does not exist in state store.");
@@ -90,6 +97,11 @@ public final class StateService {
}
}
+ public void handleStateChange(Entity entity, EntityState.EVENT event,
+ EntityStateChangeHandler handler) throws FalconException {
+ handleStateChange(entity, event, handler, null);
+ }
+
// Invokes the right method on the state change handler
private void callbackHandler(Entity entity, EntityState.EVENT event, EntityStateChangeHandler handler)
throws FalconException {
@@ -133,6 +145,7 @@ public final class StateService {
if (event == InstanceState.EVENT.TRIGGER) {
callbackHandler(instance, InstanceState.EVENT.TRIGGER, handler);
stateStore.putExecutionInstance(new InstanceState(instance));
+ ((ProcessExecutionInstance) instance).registerForNotifications(false);
LOG.debug("Instance {} triggered due to event {}.", id, event.name());
} else if (event == InstanceState.EVENT.EXTERNAL_TRIGGER) {
callbackHandler(instance, InstanceState.EVENT.EXTERNAL_TRIGGER, handler);
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
index a7deb89..10490e4 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/EntityStateStore.java
@@ -45,7 +45,7 @@ public interface EntityStateStore {
* @param entityId
* @return true, if entity exists in store.
*/
- boolean entityExists(EntityID entityId) throws StateStoreException;;
+ boolean entityExists(EntityID entityId) throws StateStoreException;
/**
* @param state
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
index 3384186..228b1f9 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/BeanMapperUtil.java
@@ -59,7 +59,7 @@ public final class BeanMapperUtil {
* @param entityState
* @return
*/
- public static EntityBean convertToEntityBean(EntityState entityState) {
+ public static EntityBean convertToEntityBean(EntityState entityState) throws IOException {
EntityBean entityBean = new EntityBean();
Entity entity = entityState.getEntity();
String id = new EntityID(entity).getKey();
@@ -67,6 +67,10 @@ public final class BeanMapperUtil {
entityBean.setName(entity.getName());
entityBean.setState(entityState.getCurrentState().toString());
entityBean.setType(entity.getEntityType().toString());
+ if (entityState.getProperties() != null && !entityState.getProperties().isEmpty()) {
+ byte[] props = getProperties(entityState);
+ entityBean.setProperties(props);
+ }
return entityBean;
}
@@ -76,11 +80,26 @@ public final class BeanMapperUtil {
* @return
* @throws StateStoreException
*/
- public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException {
+ public static EntityState convertToEntityState(EntityBean entityBean) throws StateStoreException, IOException {
try {
Entity entity = EntityUtil.getEntity(entityBean.getType(), entityBean.getName());
EntityState entityState = new EntityState(entity);
entityState.setCurrentState(EntityState.STATE.valueOf(entityBean.getState()));
+ byte[] result = entityBean.getProperties();
+ if (result != null && result.length != 0) {
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(result);
+ ObjectInputStream in = null;
+ Properties properties = null;
+ try {
+ in = new ObjectInputStream(byteArrayInputStream);
+ properties = (Properties) in.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } finally {
+ IOUtils.closeQuietly(in);
+ }
+ entityState.setProperties(properties);
+ }
return entityState;
} catch (FalconException e) {
throw new StateStoreException(e);
@@ -94,7 +113,7 @@ public final class BeanMapperUtil {
* @throws StateStoreException
*/
public static Collection<EntityState> convertToEntityState(Collection<EntityBean> entityBeans)
- throws StateStoreException {
+ throws StateStoreException, IOException {
List<EntityState> entityStates = new ArrayList<>();
if (entityBeans != null && !entityBeans.isEmpty()) {
for (EntityBean entityBean : entityBeans) {
@@ -306,6 +325,18 @@ public final class BeanMapperUtil {
}
}
+ public static byte [] getProperties(EntityState entityState) throws IOException {
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ ObjectOutputStream out = null;
+ try {
+ out = new ObjectOutputStream(byteArrayOutputStream);
+ out.writeObject(entityState.getProperties());
+ return byteArrayOutputStream.toByteArray();
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
/**
* @param summary
* @return A map of state and count given the JQL result.
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
index 38d9217..825fbc1 100644
--- a/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
+++ b/scheduler/src/main/java/org/apache/falcon/state/store/jdbc/JDBCStateStore.java
@@ -75,11 +75,16 @@ public final class JDBCStateStore extends AbstractStateStore {
if (entityExists(entityID)) {
throw new StateStoreException("Entity with key, " + key + " already exists.");
}
- EntityBean entityBean = BeanMapperUtil.convertToEntityBean(entityState);
- EntityManager entityManager = getEntityManager();
- beginTransaction(entityManager);
- entityManager.persist(entityBean);
- commitAndCloseTransaction(entityManager);
+ EntityBean entityBean = null;
+ try {
+ entityBean = BeanMapperUtil.convertToEntityBean(entityState);
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ entityManager.persist(entityBean);
+ commitAndCloseTransaction(entityManager);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
}
@@ -97,7 +102,11 @@ public final class JDBCStateStore extends AbstractStateStore {
if (entityBean == null) {
return null;
}
- return BeanMapperUtil.convertToEntityState(entityBean);
+ try {
+ return BeanMapperUtil.convertToEntityState(entityBean);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
}
private EntityBean getEntityBean(EntityID id) {
@@ -133,7 +142,11 @@ public final class JDBCStateStore extends AbstractStateStore {
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ENTITIES);
List result = q.getResultList();
entityManager.close();
- return BeanMapperUtil.convertToEntityState(result);
+ try {
+ return BeanMapperUtil.convertToEntityState(result);
+ } catch (IOException e) {
+ throw new StateStoreException(e);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
index 49e083c..29b3bbb 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/DAGEngine.java
@@ -35,10 +35,11 @@ public interface DAGEngine {
* Run an instance for execution.
*
* @param instance
+ * @param props
* @return
* @throws DAGEngineException
*/
- String run(ExecutionInstance instance) throws DAGEngineException;
+ String run(ExecutionInstance instance, Properties props) throws DAGEngineException;
/**
* @param instance
@@ -85,9 +86,10 @@ public interface DAGEngine {
* Perform dryrun of an instance.
*
* @param entity
+ * @param props
* @throws DAGEngineException
*/
- void submit(Entity entity) throws DAGEngineException;
+ void submit(Entity entity, Properties props) throws DAGEngineException;
/**
* Returns info about the Job.
http://git-wip-us.apache.org/repos/asf/falcon/blob/7e4dc0d9/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
index c6d4212..6dbec0c 100644
--- a/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
+++ b/scheduler/src/main/java/org/apache/falcon/workflow/engine/FalconWorkflowEngine.java
@@ -68,6 +68,7 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun";
public static final String FALCON_RERUN = "falcon.system.rerun";
+ public static final String FALCON_SKIP_DRYRUN = "falcon.system.skip.dryrun";
public static final String FALCON_RESUME = "falcon.system.resume";
private enum JobAction {
@@ -85,13 +86,24 @@ public class FalconWorkflowEngine extends AbstractWorkflowEngine {
}
@Override
- public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) throws FalconException {
- EXECUTION_SERVICE.schedule(entity);
+ public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException {
+ Properties props = new Properties();
+ if (suppliedProps != null && !suppliedProps.isEmpty()) {
+ props.putAll(suppliedProps);
+ }
+ if (skipDryRun) {
+ props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true");
+ }
+ EXECUTION_SERVICE.schedule(entity, props);
}
@Override
public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException {
- DAGEngineFactory.getDAGEngine(clusterName).submit(entity);
+ Properties props = new Properties();
+ if (skipDryRun) {
+ props.put(FalconWorkflowEngine.FALCON_SKIP_DRYRUN, "true");
+ }
+ DAGEngineFactory.getDAGEngine(clusterName).submit(entity, props);
}
@Override