You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/03/18 12:41:10 UTC
[5/5] git commit: FALCON-356 Merge OozieProcessMapper and
OozieProcessWorkflowBuilder. Contributed by Shwetha GS
FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e2545b08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e2545b08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e2545b08
Branch: refs/heads/master
Commit: e2545b0874d206f3be88d4b3ac7003eae1161c44
Parents: 5e43521
Author: Shwetha GS <sh...@gmail.com>
Authored: Tue Mar 18 17:10:54 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Tue Mar 18 17:10:54 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/falcon/util/ReflectionUtils.java | 33 +-
.../apache/falcon/workflow/WorkflowBuilder.java | 20 +-
.../apache/falcon/util/ReflectionUtilsTest.java | 49 ++
.../falcon/converter/OozieFeedMapper.java | 596 -------------
.../workflow/OozieFeedWorkflowBuilder.java | 594 ++++++++++++-
.../falcon/converter/OozieFeedMapperTest.java | 505 -----------
.../converter/OozieFeedWorkflowBuilderTest.java | 505 +++++++++++
.../converter/AbstractOozieEntityMapper.java | 428 ----------
.../java/org/apache/falcon/util/OozieUtils.java | 54 ++
.../falcon/workflow/OozieWorkflowBuilder.java | 370 ++++++++
.../workflow/engine/OozieWorkflowEngine.java | 15 +-
.../falcon/converter/OozieProcessMapper.java | 833 -------------------
.../workflow/OozieProcessWorkflowBuilder.java | 822 +++++++++++++++++-
.../OozieProcessMapperLateProcessTest.java | 96 ---
.../converter/OozieProcessMapperTest.java | 557 -------------
.../OozieProcessWorkflowBuilderTest.java | 559 +++++++++++++
.../falcon/retention/FeedEvictorTest.java | 2 +-
18 files changed, 2936 insertions(+), 3104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5748d27..f3ddf96 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@ Trunk (Unreleased)
(Venkatesh Seetharam)
IMPROVEMENTS
+ FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. (Shwetha GS)
+
FALCON-355 Remove SLAMonitoringService. (Shwetha GS)
FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
index 4a00fa9..80022e0 100644
--- a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
+++ b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.falcon.util;
import org.apache.falcon.FalconException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
/**
@@ -30,12 +31,11 @@ public final class ReflectionUtils {
private ReflectionUtils() {}
public static <T> T getInstance(String classKey) throws FalconException {
- String clazzName = StartupProperties.get().getProperty(classKey);
- try {
- return ReflectionUtils.<T>getInstanceByClassName(clazzName);
- } catch (FalconException e) {
- throw new FalconException("Unable to get instance for key: " + classKey, e);
- }
+ return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey));
+ }
+
+ public static <T> T getInstance(String classKey, Class<?> argCls, Object arg) throws FalconException {
+ return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey), argCls, arg);
}
@SuppressWarnings("unchecked")
@@ -52,4 +52,25 @@ public final class ReflectionUtils {
throw new FalconException("Unable to get instance for " + clazzName, e);
}
}
+
+ /**
+ * Invokes constructor with one argument.
+ * @param clazzName - classname
+ * @param argCls - Class of the argument
+ * @param arg - constructor argument
+ * @param <T> - instance type
+ * @return Class instance
+ * @throws FalconException
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getInstanceByClassName(String clazzName, Class<?> argCls, Object arg) throws
+ FalconException {
+ try {
+ Class<T> clazz = (Class<T>) ReflectionUtils.class.getClassLoader().loadClass(clazzName);
+ Constructor<T> constructor = clazz.getConstructor(argCls);
+ return constructor.newInstance(arg);
+ } catch (Exception e) {
+ throw new FalconException("Unable to get instance for " + clazzName, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
index 26243e7..1f9a8c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
@@ -22,8 +22,6 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.util.ReflectionUtils;
-import java.util.Date;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -32,16 +30,22 @@ import java.util.Properties;
* @param <T>
*/
public abstract class WorkflowBuilder<T extends Entity> {
+ protected T entity;
+
+ protected WorkflowBuilder(T entity) {
+ this.entity = entity;
+ }
+
+ public T getEntity() {
+ return entity;
+ }
public static WorkflowBuilder<Entity> getBuilder(String engine, Entity entity) throws FalconException {
String classKey = engine + "." + entity.getEntityType().name().toLowerCase() + ".workflow.builder";
- return ReflectionUtils.getInstance(classKey);
+ return ReflectionUtils.getInstance(classKey, entity.getEntityType().getEntityClass(), entity);
}
- public abstract Map<String, Properties> newWorkflowSchedule(T entity, List<String> clusters) throws FalconException;
-
- public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user)
- throws FalconException;
+ public abstract Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException;
- public abstract String[] getWorkflowNames(T entity);
+ public abstract String[] getWorkflowNames();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
new file mode 100644
index 0000000..bc0bce0
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Tests ReflectionUtils.
+ */
+@Test
+public class ReflectionUtilsTest {
+ public void testGetInstance() throws FalconException {
+ //with 1 arg constructor, arg null
+ Object e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class, null);
+ Assert.assertTrue(e instanceof FalconException);
+
+ //with 1 arg constructor, arg not null
+ e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class,
+ new Throwable());
+ Assert.assertTrue(e instanceof FalconException);
+
+ //no constructor, using get() method
+ e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.util.StartupProperties");
+ Assert.assertTrue(e instanceof StartupProperties);
+
+ //with empty constructor
+ e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.entity.parser.ClusterEntityParser");
+ Assert.assertTrue(e instanceof ClusterEntityParser);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
deleted file mode 100644
index 2b3315f..0000000
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
-import org.apache.falcon.oozie.coordinator.ACTION;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.*;
-
-/**
- * Mapper which maps feed definition to oozie workflow definitions for
- * replication & retention.
- */
-public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
-
- private static final Logger LOG = Logger.getLogger(OozieFeedMapper.class);
-
- private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
- private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
-
- public OozieFeedMapper(Feed feed) {
- super(feed);
- }
-
- @Override
- protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
- List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
- COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
- if (retentionCoord != null) {
- coords.add(retentionCoord);
- }
- List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
- coords.addAll(replicationCoords);
- return coords;
- }
-
- private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-
- Feed feed = getEntity();
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-
- if (feedCluster.getValidity().getEnd().before(new Date())) {
- LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
- + " is not in the future");
- return null;
- }
-
- return retentionMapper.getRetentionCoordinator(cluster, bundlePath, feed, feedCluster);
- }
-
- private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
- throws FalconException {
-
- Feed feed = getEntity();
- List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
-
- if (FeedHelper.getCluster(feed, targetCluster.getName()).getType() == ClusterType.TARGET) {
- String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, feed).toString();
- Path basePath = getCoordPath(bundlePath, coordName);
- replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
-
- for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
- if (feedCluster.getType() == ClusterType.SOURCE) {
- COORDINATORAPP coord = replicationMapper.createAndGetCoord(feed,
- (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
- targetCluster, bundlePath);
-
- if (coord != null) {
- replicationCoords.add(coord);
- }
- }
- }
- }
-
- return replicationCoords;
- }
-
- @Override
- protected Map<String, String> getEntityProperties() {
- Feed feed = getEntity();
- Map<String, String> props = new HashMap<String, String>();
- if (feed.getProperties() != null) {
- for (Property prop : feed.getProperties().getProperties()) {
- props.put(prop.getName(), prop.getValue());
- }
- }
- return props;
- }
-
- private final class RetentionOozieWorkflowMapper {
-
- private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
-
- private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
- org.apache.falcon.entity.v0.feed.Cluster feedCluster)
- throws FalconException {
-
- COORDINATORAPP retentionApp = new COORDINATORAPP();
- String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
- retentionApp.setName(coordName);
- retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
- retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
- retentionApp.setTimezone(feed.getTimezone().getID());
- TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
- if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
- retentionApp.setFrequency("${coord:hours(6)}");
- } else {
- retentionApp.setFrequency("${coord:days(1)}");
- }
-
- Path wfPath = getCoordPath(bundlePath, coordName);
- retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
- return retentionApp;
- }
-
- private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
- throws FalconException {
- Feed feed = getEntity();
- ACTION retentionAction = new ACTION();
- WORKFLOW retentionWorkflow = new WORKFLOW();
- createRetentionWorkflow(cluster, wfPath, wfName);
- retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
-
- Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
- props.put("timeZone", feed.getTimezone().getID());
- props.put("frequency", feed.getFrequency().getTimeUnit().name());
-
- final Storage storage = FeedHelper.createStorage(cluster, feed);
- props.put("falconFeedStorageType", storage.getType().name());
-
- String feedDataPath = storage.getUriTemplate();
- props.put("feedDataPath",
- feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-
- org.apache.falcon.entity.v0.feed.Cluster feedCluster =
- FeedHelper.getCluster(feed, cluster.getName());
- props.put("limit", feedCluster.getRetention().getLimit().toString());
-
- props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
- props.put(ARG.feedNames.getPropName(), feed.getName());
- props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-
- props.put("falconInputFeeds", feed.getName());
- props.put("falconInPaths", "IGNORE");
-
- propagateUserWorkflowProperties(props, "eviction");
-
- retentionWorkflow.setConfiguration(getCoordConfig(props));
- retentionAction.setWorkflow(retentionWorkflow);
-
- return retentionAction;
- }
-
- private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
- try {
- WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
- retWfApp.setName(wfName);
- addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
- addOozieRetries(retWfApp);
- marshal(cluster, retWfApp, wfPath);
- } catch(IOException e) {
- throw new FalconException("Unable to create retention workflow", e);
- }
- }
- }
-
- private class ReplicationOozieWorkflowMapper {
- private static final String MR_MAX_MAPS = "maxMaps";
-
- private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
- private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
- private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
-
- private static final String TIMEOUT = "timeout";
- private static final String PARALLEL = "parallel";
-
- private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
- throws FalconException {
- try {
- WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
- repWFapp.setName(wfName);
- addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
- addOozieRetries(repWFapp);
- marshal(cluster, repWFapp, wfPath);
- } catch(IOException e) {
- throw new FalconException("Unable to create replication workflow", e);
- }
-
- }
-
- private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
- Path bundlePath) throws FalconException {
- long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
- Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
- Date sourceEndDate = getEndDate(feed, srcCluster);
-
- Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
- Date targetEndDate = getEndDate(feed, trgCluster);
-
- if (noOverlapExists(sourceStartDate, sourceEndDate,
- targetStartDate, targetEndDate)) {
- LOG.warn("Not creating replication coordinator, as the source cluster:"
- + srcCluster.getName()
- + " and target cluster: "
- + trgCluster.getName()
- + " do not have overlapping dates");
- return null;
- }
-
- COORDINATORAPP replicationCoord;
- try {
- replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
- } catch (FalconException e) {
- throw new FalconException("Cannot unmarshall replication coordinator template", e);
- }
-
- String coordName = EntityUtil.getWorkflowName(
- Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
- String start = sourceStartDate.after(targetStartDate)
- ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
- String end = sourceEndDate.before(targetEndDate)
- ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
-
- initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
- setCoordControls(feed, replicationCoord);
-
- final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
- initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
-
- final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
- initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
-
- Path wfPath = getCoordPath(bundlePath, coordName);
- ACTION replicationWorkflowAction = getReplicationWorkflowAction(
- srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
- replicationCoord.setAction(replicationWorkflowAction);
-
- return replicationCoord;
- }
-
- private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
- Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
- return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
- }
-
- private Date getEndDate(Feed feed, Cluster cluster) {
- return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
- }
-
- private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
- Date targetStartDate, Date targetEndDate) {
- return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
- }
-
- private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
- Feed feed, String start, String end, long delayInMillis) {
- replicationCoord.setName(coordName);
- replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
- if (delayInMillis > 0) {
- long delayInMins = -1 * delayInMillis / (1000 * 60);
- String elExp = "${now(0," + delayInMins + ")}";
-
- replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
- replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
- }
-
- replicationCoord.setStart(start);
- replicationCoord.setEnd(end);
- replicationCoord.setTimezone(feed.getTimezone().getID());
- }
-
- private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
- Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
- long delayInMillis=0;
- if (replicationDelay != null) {
- delayInMillis = ExpressionHelper.get().evaluate(
- replicationDelay.toString(), Long.class);
- }
-
- return delayInMillis;
- }
-
- private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
- long frequencyInMillis = ExpressionHelper.get().evaluate(
- feed.getFrequency().toString(), Long.class);
- long timeoutInMillis = frequencyInMillis * 6;
- if (timeoutInMillis < THIRTY_MINUTES) {
- timeoutInMillis = THIRTY_MINUTES;
- }
-
- Map<String, String> props = getEntityProperties();
- String timeout = props.get(TIMEOUT);
- if (timeout!=null) {
- try{
- timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
- } catch (Exception ignore) {
- LOG.error("Unable to evaluate timeout:", ignore);
- }
- }
- replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
- replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-
- String parallelProp = props.get(PARALLEL);
- int parallel = 1;
- if (parallelProp != null) {
- try {
- parallel = Integer.parseInt(parallelProp);
- } catch (NumberFormatException ignore) {
- LOG.error("Unable to parse parallel:", ignore);
- }
- }
- replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
- }
-
- private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
- Storage sourceStorage) throws FalconException {
- SYNCDATASET inputDataset = (SYNCDATASET)
- replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
- String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
- if (sourceStorage.getType() == Storage.TYPE.TABLE) {
- uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
- }
- inputDataset.setUriTemplate(uriTemplate);
-
- setDatasetValues(inputDataset, feed, srcCluster);
-
- if (feed.getAvailabilityFlag() == null) {
- inputDataset.setDoneFlag("");
- } else {
- inputDataset.setDoneFlag(feed.getAvailabilityFlag());
- }
- }
-
- private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
- Storage targetStorage) throws FalconException {
- SYNCDATASET outputDataset = (SYNCDATASET)
- replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
-
- String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
- if (targetStorage.getType() == Storage.TYPE.TABLE) {
- uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
- }
- outputDataset.setUriTemplate(uriTemplate);
-
- setDatasetValues(outputDataset, feed, targetCluster);
- }
-
- private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
- dataset.setInitialInstance(SchemaHelper.formatDateUTC(
- FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
- dataset.setTimezone(feed.getTimezone().getID());
- dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
- }
-
- private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
- String wfName, Storage sourceStorage,
- Storage targetStorage) throws FalconException {
- ACTION replicationAction = new ACTION();
- WORKFLOW replicationWF = new WORKFLOW();
- try {
- replicationWF.setAppPath(getStoragePath(wfPath.toString()));
- Feed feed = getEntity();
-
- Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
- props.put("srcClusterName", srcCluster.getName());
- props.put("srcClusterColo", srcCluster.getColo());
- if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
- props.put(MR_MAX_MAPS, getDefaultMaxMaps());
- }
-
- // the storage type is uniform across source and target feeds for replication
- props.put("falconFeedStorageType", sourceStorage.getType().name());
-
- String instancePaths = null;
- if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
- String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
- instancePaths = pathsWithPartitions;
-
- propagateFileSystemCopyProperties(pathsWithPartitions, props);
- } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
- instancePaths = "${coord:dataIn('input')}";
-
- final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
- propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
- final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
- propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
- propagateTableCopyProperties(srcCluster, sourceTableStorage,
- trgCluster, targetTableStorage, props);
- setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
- }
-
- propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
- propagateUserWorkflowProperties(props, "replication");
-
- replicationWF.setConfiguration(getCoordConfig(props));
- replicationAction.setWorkflow(replicationWF);
-
- } catch (Exception e) {
- throw new FalconException("Unable to create replication workflow", e);
- }
-
- return replicationAction;
- }
-
- private String getDefaultMaxMaps() {
- return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
- }
-
- private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
- Feed feed) throws FalconException {
- String srcPart = FeedHelper.normalizePartitionExpression(
- FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
- srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
-
- String targetPart = FeedHelper.normalizePartitionExpression(
- FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
- targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
-
- StringBuilder pathsWithPartitions = new StringBuilder();
- pathsWithPartitions.append("${coord:dataIn('input')}/")
- .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
- String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
- parts = StringUtils.stripEnd(parts, "/");
- return parts;
- }
-
- private void propagateFileSystemCopyProperties(String pathsWithPartitions,
- Map<String, String> props) throws FalconException {
- props.put("sourceRelativePaths", pathsWithPartitions);
-
- props.put("distcpSourcePaths", "${coord:dataIn('input')}");
- props.put("distcpTargetPaths", "${coord:dataOut('output')}");
- }
-
- private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
- Map<String, String> props, String prefix) {
- props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
- props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
- props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
-
- props.put(prefix + "Database", tableStorage.getDatabase());
- props.put(prefix + "Table", tableStorage.getTable());
- props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
- }
-
- private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
- Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
- throws IOException, FalconException {
- Configuration conf = ClusterHelper.getConfiguration(trgCluster);
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
-
- // copy import export scripts to stagingDir
- Path scriptPath = new Path(wfPath, "scripts");
- copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
- copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
-
- // create hive conf to stagingDir
- Path confPath = new Path(wfPath + "/conf");
- createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
- createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
- }
-
- private void copyHiveScript(FileSystem fs, Path scriptPath,
- String localScriptPath, String scriptName) throws IOException {
- OutputStream out = null;
- InputStream in = null;
- try {
- out = fs.create(new Path(scriptPath, scriptName));
- in = OozieFeedMapper.class.getResourceAsStream(localScriptPath + scriptName);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
- }
-
- private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
- Cluster trgCluster, CatalogStorage targetStorage,
- Map<String, String> props) {
- // create staging dirs for export at source & set it as distcpSourcePaths
- String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
- String sourceStagingDir =
- FeedHelper.getStagingDir(srcCluster, getEntity(), sourceStorage, Tag.REPLICATION)
- + "/" + sourceDatedPartitionKey
- + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
- props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
- // create staging dirs for import at target & set it as distcpTargetPaths
- String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
- String targetStagingDir =
- FeedHelper.getStagingDir(trgCluster, getEntity(), targetStorage, Tag.REPLICATION)
- + "/" + targetDatedPartitionKey
- + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
- props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
- props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
- }
-
- private void propagateLateDataProperties(Feed feed, String instancePaths,
- String falconFeedStorageType, Map<String, String> props) {
- // todo these pairs are the same but used in different context
- // late data handler - should-record action
- props.put("falconInputFeeds", feed.getName());
- props.put("falconInPaths", instancePaths);
-
- // storage type for each corresponding feed - in this case only one feed is involved
- // needed to compute usage based on storage type in LateDataHandler
- props.put("falconInputFeedStorageTypes", falconFeedStorageType);
-
- // falcon post processing
- props.put(ARG.feedNames.getPropName(), feed.getName());
- props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
- }
- }
-
- private void addOozieRetries(WORKFLOWAPP workflow) {
- for (Object object : workflow.getDecisionOrForkOrJoin()) {
- if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
- continue;
- }
- org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
- String actionName = action.getName();
- if (FALCON_ACTIONS.contains(actionName)) {
- decorateWithOozieRetries(action);
- }
- }
- }
-
- private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
- props.put("userWorkflowName", policy + "-policy");
- props.put("userWorkflowEngine", "falcon");
-
- String version;
- try {
- version = BuildProperties.get().getProperty("build.version");
- } catch (Exception e) { // unfortunate that this is only available in prism/webapp
- version = "0.5";
- }
- props.put("userWorkflowVersion", version);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 5e3a30e..2008c2d 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -18,60 +18,83 @@
package org.apache.falcon.workflow;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
-import org.apache.falcon.converter.AbstractOozieEntityMapper;
-import org.apache.falcon.converter.OozieFeedMapper;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
-import java.util.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
/**
* Workflow definition builder for feed replication & retention.
*/
public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
+ private static final Logger LOG = Logger.getLogger(OozieFeedWorkflowBuilder.class);
+
+ public OozieFeedWorkflowBuilder(Feed entity) {
+ super(entity);
+ }
@Override
- public Map<String, Properties> newWorkflowSchedule(Feed feed, List<String> clusters) throws FalconException {
+ public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
for (String clusterName : clusters) {
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
- Properties properties = newWorkflowSchedule(feed, feedCluster.getValidity().getStart(), clusterName,
- CurrentUser.getUser());
- if (properties == null) {
- continue;
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, clusterName);
+ if (!feedCluster.getValidity().getStart().before(feedCluster.getValidity().getEnd())) {
+ LOG.info("feed validity start <= end for cluster " + clusterName + ". Skipping schedule");
+ break;
}
- propertiesMap.put(clusterName, properties);
- }
- return propertiesMap;
- }
- @Override
- public Properties newWorkflowSchedule(Feed feed, Date startDate, String clusterName, String user)
- throws FalconException {
-
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
- if (!startDate.before(feedCluster.getValidity().getEnd())) {
- return null;
- }
+ Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
+ Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
- Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
- Path bundlePath = EntityUtil.getNewStagingPath(cluster, feed);
- Feed feedClone = (Feed) feed.copy();
- EntityUtil.setStartDate(feedClone, clusterName, startDate);
-
- AbstractOozieEntityMapper<Feed> mapper = new OozieFeedMapper(feedClone);
- if (!mapper.map(cluster, bundlePath)) {
- return null;
+ if (!map(cluster, bundlePath)) {
+ break;
+ }
+ propertiesMap.put(clusterName, createAppProperties(clusterName, bundlePath, CurrentUser.getUser()));
}
- return createAppProperties(clusterName, bundlePath, user);
+ return propertiesMap;
}
@Override
@@ -82,9 +105,518 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
}
@Override
- public String[] getWorkflowNames(Feed entity) {
+ public String[] getWorkflowNames() {
return new String[]{
EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(),
EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString(), };
}
+
+ private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
+ private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
+
+ @Override
+ public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+ List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
+ COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
+ if (retentionCoord != null) {
+ coords.add(retentionCoord);
+ }
+ List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
+ coords.addAll(replicationCoords);
+ return coords;
+ }
+
+ private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+
+ if (feedCluster.getValidity().getEnd().before(new Date())) {
+ LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
+ + " is not in the future");
+ return null;
+ }
+
+ return retentionMapper.getRetentionCoordinator(cluster, bundlePath, entity, feedCluster);
+ }
+
+ private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
+ throws FalconException {
+ List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
+
+ if (FeedHelper.getCluster(entity, targetCluster.getName()).getType() == ClusterType.TARGET) {
+ String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
+ Path basePath = getCoordPath(bundlePath, coordName);
+ replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
+
+ for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
+ if (feedCluster.getType() == ClusterType.SOURCE) {
+ COORDINATORAPP coord = replicationMapper.createAndGetCoord(entity,
+ (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
+ targetCluster, bundlePath);
+
+ if (coord != null) {
+ replicationCoords.add(coord);
+ }
+ }
+ }
+ }
+
+ return replicationCoords;
+ }
+
+ @Override
+ protected Map<String, String> getEntityProperties() {
+ Map<String, String> props = new HashMap<String, String>();
+ if (entity.getProperties() != null) {
+ for (Property prop : entity.getProperties().getProperties()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ }
+ return props;
+ }
+
+ private final class RetentionOozieWorkflowMapper {
+
+ private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
+
+ private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster) throws FalconException {
+ COORDINATORAPP retentionApp = new COORDINATORAPP();
+ String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
+ retentionApp.setName(coordName);
+ retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+ retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
+ retentionApp.setTimezone(feed.getTimezone().getID());
+ TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
+ if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
+ retentionApp.setFrequency("${coord:hours(6)}");
+ } else {
+ retentionApp.setFrequency("${coord:days(1)}");
+ }
+
+ Path wfPath = getCoordPath(bundlePath, coordName);
+ retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
+ return retentionApp;
+ }
+
+ private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
+ throws FalconException {
+ ACTION retentionAction = new ACTION();
+ WORKFLOW retentionWorkflow = new WORKFLOW();
+ createRetentionWorkflow(cluster, wfPath, wfName);
+ retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
+
+ Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+ props.put("timeZone", entity.getTimezone().getID());
+ props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+ final Storage storage = FeedHelper.createStorage(cluster, entity);
+ props.put("falconFeedStorageType", storage.getType().name());
+
+ String feedDataPath = storage.getUriTemplate();
+ props.put("feedDataPath",
+ feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+ props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+ props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+ props.put(ARG.feedNames.getPropName(), entity.getName());
+ props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+
+ props.put("falconInputFeeds", entity.getName());
+ props.put("falconInPaths", "IGNORE");
+
+ propagateUserWorkflowProperties(props, "eviction");
+
+ retentionWorkflow.setConfiguration(getCoordConfig(props));
+ retentionAction.setWorkflow(retentionWorkflow);
+ return retentionAction;
+ }
+
+ private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
+ try {
+ WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
+ retWfApp.setName(wfName);
+ addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
+ addOozieRetries(retWfApp);
+ marshal(cluster, retWfApp, wfPath);
+ } catch(IOException e) {
+ throw new FalconException("Unable to create retention workflow", e);
+ }
+ }
+ }
+
+ private class ReplicationOozieWorkflowMapper {
+ private static final String MR_MAX_MAPS = "maxMaps";
+
+ private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+ private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
+ private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
+
+ private static final String TIMEOUT = "timeout";
+ private static final String PARALLEL = "parallel";
+
+ private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
+ throws FalconException {
+ try {
+ WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
+ repWFapp.setName(wfName);
+ addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
+ addOozieRetries(repWFapp);
+ marshal(cluster, repWFapp, wfPath);
+ } catch(IOException e) {
+ throw new FalconException("Unable to create replication workflow", e);
+ }
+
+ }
+
+ private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
+ Path bundlePath) throws FalconException {
+ long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
+ Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
+ Date sourceEndDate = getEndDate(feed, srcCluster);
+
+ Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
+ Date targetEndDate = getEndDate(feed, trgCluster);
+
+ if (noOverlapExists(sourceStartDate, sourceEndDate,
+ targetStartDate, targetEndDate)) {
+ LOG.warn("Not creating replication coordinator, as the source cluster:" + srcCluster.getName()
+ + "and target cluster: " + trgCluster.getName() + " do not have overlapping dates");
+ return null;
+ }
+
+ COORDINATORAPP replicationCoord;
+ try {
+ replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+ } catch (FalconException e) {
+ throw new FalconException("Cannot unmarshall replication coordinator template", e);
+ }
+
+ String coordName = EntityUtil.getWorkflowName(
+ Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
+ String start = sourceStartDate.after(targetStartDate)
+ ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
+ String end = sourceEndDate.before(targetEndDate)
+ ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
+
+ initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
+ setCoordControls(feed, replicationCoord);
+
+ final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
+ initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
+
+ final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
+ initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
+
+ Path wfPath = getCoordPath(bundlePath, coordName);
+ ACTION replicationWorkflowAction = getReplicationWorkflowAction(
+ srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
+ replicationCoord.setAction(replicationWorkflowAction);
+
+ return replicationCoord;
+ }
+
+ private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
+ Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
+ return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+ }
+
+ private Date getEndDate(Feed feed, Cluster cluster) {
+ return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
+ }
+
+ private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
+ Date targetStartDate, Date targetEndDate) {
+ return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
+ }
+
+ private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
+ Feed feed, String start, String end, long delayInMillis) {
+ replicationCoord.setName(coordName);
+ replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+ if (delayInMillis > 0) {
+ long delayInMins = -1 * delayInMillis / (1000 * 60);
+ String elExp = "${now(0," + delayInMins + ")}";
+
+ replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+ replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+ }
+
+ replicationCoord.setStart(start);
+ replicationCoord.setEnd(end);
+ replicationCoord.setTimezone(feed.getTimezone().getID());
+ }
+
+ private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
+ Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
+ long delayInMillis=0;
+ if (replicationDelay != null) {
+ delayInMillis = ExpressionHelper.get().evaluate(
+ replicationDelay.toString(), Long.class);
+ }
+
+ return delayInMillis;
+ }
+
+ private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
+ long frequencyInMillis = ExpressionHelper.get().evaluate(
+ feed.getFrequency().toString(), Long.class);
+ long timeoutInMillis = frequencyInMillis * 6;
+ if (timeoutInMillis < THIRTY_MINUTES) {
+ timeoutInMillis = THIRTY_MINUTES;
+ }
+
+ Map<String, String> props = getEntityProperties();
+ String timeout = props.get(TIMEOUT);
+ if (timeout!=null) {
+ try{
+ timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
+ } catch (Exception ignore) {
+ LOG.error("Unable to evaluate timeout:", ignore);
+ }
+ }
+ replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+ replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+
+ String parallelProp = props.get(PARALLEL);
+ int parallel = 1;
+ if (parallelProp != null) {
+ try {
+ parallel = Integer.parseInt(parallelProp);
+ } catch (NumberFormatException ignore) {
+ LOG.error("Unable to parse parallel:", ignore);
+ }
+ }
+ replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
+ }
+
+ private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
+ Storage sourceStorage) throws FalconException {
+ SYNCDATASET inputDataset = (SYNCDATASET)
+ replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+ String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
+ if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ inputDataset.setUriTemplate(uriTemplate);
+
+ setDatasetValues(inputDataset, feed, srcCluster);
+
+ if (feed.getAvailabilityFlag() == null) {
+ inputDataset.setDoneFlag("");
+ } else {
+ inputDataset.setDoneFlag(feed.getAvailabilityFlag());
+ }
+ }
+
+ private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
+ Storage targetStorage) throws FalconException {
+ SYNCDATASET outputDataset = (SYNCDATASET)
+ replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
+
+ String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
+ if (targetStorage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ outputDataset.setUriTemplate(uriTemplate);
+
+ setDatasetValues(outputDataset, feed, targetCluster);
+ }
+
+ private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
+ dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+ FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
+ dataset.setTimezone(feed.getTimezone().getID());
+ dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+ }
+
+ private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
+ String wfName, Storage sourceStorage,
+ Storage targetStorage) throws FalconException {
+ ACTION replicationAction = new ACTION();
+ WORKFLOW replicationWF = new WORKFLOW();
+ try {
+ replicationWF.setAppPath(getStoragePath(wfPath.toString()));
+ Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+ props.put("srcClusterName", srcCluster.getName());
+ props.put("srcClusterColo", srcCluster.getColo());
+ if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+ props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+ }
+
+ // the storage type is uniform across source and target feeds for replication
+ props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+ String instancePaths = null;
+ if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+ String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, entity);
+ instancePaths = pathsWithPartitions;
+
+ propagateFileSystemCopyProperties(pathsWithPartitions, props);
+ } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+ instancePaths = "${coord:dataIn('input')}";
+
+ final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+ propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+ final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+ propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+ propagateTableCopyProperties(srcCluster, sourceTableStorage,
+ trgCluster, targetTableStorage, props);
+ setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
+ }
+
+ propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
+ propagateUserWorkflowProperties(props, "replication");
+
+ replicationWF.setConfiguration(getCoordConfig(props));
+ replicationAction.setWorkflow(replicationWF);
+
+ } catch (Exception e) {
+ throw new FalconException("Unable to create replication workflow", e);
+ }
+
+ return replicationAction;
+ }
+
+ private String getDefaultMaxMaps() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+ }
+
+ private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
+ Feed feed) throws FalconException {
+ String srcPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
+ srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+
+ String targetPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
+ targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
+
+ StringBuilder pathsWithPartitions = new StringBuilder();
+ pathsWithPartitions.append("${coord:dataIn('input')}/")
+ .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+ String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+ parts = StringUtils.stripEnd(parts, "/");
+ return parts;
+ }
+
+ private void propagateFileSystemCopyProperties(String pathsWithPartitions,
+ Map<String, String> props) throws FalconException {
+ props.put("sourceRelativePaths", pathsWithPartitions);
+
+ props.put("distcpSourcePaths", "${coord:dataIn('input')}");
+ props.put("distcpTargetPaths", "${coord:dataOut('output')}");
+ }
+
+ private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+ Map<String, String> props, String prefix) {
+ props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
+ props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
+ props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
+
+ props.put(prefix + "Database", tableStorage.getDatabase());
+ props.put(prefix + "Table", tableStorage.getTable());
+ props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
+ }
+
+ private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
+ Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
+ throws IOException, FalconException {
+ Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+
+ // copy import export scripts to stagingDir
+ Path scriptPath = new Path(wfPath, "scripts");
+ copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
+ copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
+
+ // create hive conf to stagingDir
+ Path confPath = new Path(wfPath + "/conf");
+ createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
+ createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
+ }
+
+ private void copyHiveScript(FileSystem fs, Path scriptPath,
+ String localScriptPath, String scriptName) throws IOException {
+ OutputStream out = null;
+ InputStream in = null;
+ try {
+ out = fs.create(new Path(scriptPath, scriptName));
+ in = OozieFeedWorkflowBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
+ Cluster trgCluster, CatalogStorage targetStorage,
+ Map<String, String> props) {
+ // create staging dirs for export at source & set it as distcpSourcePaths
+ String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
+ String sourceStagingDir =
+ FeedHelper.getStagingDir(srcCluster, entity, sourceStorage, Tag.REPLICATION)
+ + "/" + sourceDatedPartitionKey
+ + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
+ props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+ // create staging dirs for import at target & set it as distcpTargetPaths
+ String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
+ String targetStagingDir =
+ FeedHelper.getStagingDir(trgCluster, entity, targetStorage, Tag.REPLICATION)
+ + "/" + targetDatedPartitionKey
+ + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
+ props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+ props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
+ }
+
+ private void propagateLateDataProperties(Feed feed, String instancePaths,
+ String falconFeedStorageType, Map<String, String> props) {
+ // todo these pairs are the same but used in different context
+ // late data handler - should-record action
+ props.put("falconInputFeeds", feed.getName());
+ props.put("falconInPaths", instancePaths);
+
+ // storage type for each corresponding feed - in this case only one feed is involved
+ // needed to compute usage based on storage type in LateDataHandler
+ props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+ // falcon post processing
+ props.put(ARG.feedNames.getPropName(), feed.getName());
+ props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+ }
+ }
+
+ private void addOozieRetries(WORKFLOWAPP workflow) {
+ for (Object object : workflow.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+ continue;
+ }
+ org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
+ String actionName = action.getName();
+ if (FALCON_ACTIONS.contains(actionName)) {
+ decorateWithOozieRetries(action);
+ }
+ }
+ }
+
+ private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
+ props.put("userWorkflowName", policy + "-policy");
+ props.put("userWorkflowEngine", "falcon");
+
+ String version;
+ try {
+ version = BuildProperties.get().getProperty("build.version");
+ } catch (Exception e) { // unfortunate that this is only available in prism/webapp
+ version = "0.5";
+ }
+ props.put("userWorkflowVersion", version);
+ }
}