You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/07/31 12:14:15 UTC
[2/2] git commit: FACON-481 Simplify process parent workflow.
Contributed by Shwetha GS
FACON-481 Simplify process parent workflow. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3bb5a62a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3bb5a62a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3bb5a62a
Branch: refs/heads/master
Commit: 3bb5a62affc5a87df4232865c5e8894aef0333bd
Parents: ed100c8
Author: Shwetha GS <sh...@inmobi.com>
Authored: Thu Jul 31 15:44:05 2014 +0530
Committer: Shwetha GS <sh...@inmobi.com>
Committed: Thu Jul 31 15:44:05 2014 +0530
----------------------------------------------------------------------
.../org/apache/falcon/entity/EntityUtil.java | 14 +
.../apache/falcon/oozie/OozieBundleBuilder.java | 2 +-
.../falcon/oozie/OozieCoordinatorBuilder.java | 20 +-
.../apache/falcon/oozie/OozieEntityBuilder.java | 74 ++---
.../OozieOrchestrationWorkflowBuilder.java | 124 +++++--
.../feed/FSReplicationWorkflowBuilder.java | 70 ++++
.../feed/FeedReplicationCoordinatorBuilder.java | 24 +-
.../feed/FeedReplicationWorkflowBuilder.java | 63 +---
.../feed/FeedRetentionCoordinatorBuilder.java | 4 +-
.../feed/FeedRetentionWorkflowBuilder.java | 42 ++-
.../feed/HCatReplicationWorkflowBuilder.java | 133 ++++++++
.../process/HiveProcessWorkflowBuilder.java | 9 +-
.../process/OozieProcessWorkflowBuilder.java | 9 +-
.../process/PigProcessWorkflowBuilder.java | 12 +-
.../ProcessExecutionCoordinatorBuilder.java | 16 +-
.../ProcessExecutionWorkflowBuilder.java | 66 ++--
.../java/org/apache/falcon/util/OozieUtils.java | 2 +
.../resources/action/feed/eviction-action.xml | 55 ++++
.../action/feed/falcon-table-export.hql | 18 +
.../action/feed/falcon-table-import.hql | 20 ++
.../action/feed/replication-action.xml | 59 ++++
.../resources/action/feed/table-cleanup.xml | 25 ++
.../main/resources/action/feed/table-export.xml | 45 +++
.../main/resources/action/feed/table-import.xml | 42 +++
.../src/main/resources/action/post-process.xml | 94 ++++++
oozie/src/main/resources/action/pre-process.xml | 50 +++
.../resources/action/process/hive-action.xml | 37 +++
.../resources/action/process/oozie-action.xml | 25 ++
.../resources/action/process/pig-action.xml | 40 +++
.../resources/workflow/falcon-table-export.hql | 18 -
.../resources/workflow/falcon-table-import.hql | 20 --
.../workflow/process-parent-workflow.xml | 278 ----------------
.../resources/workflow/replication-workflow.xml | 330 -------------------
.../resources/workflow/retention-workflow.xml | 208 ------------
.../feed/OozieFeedWorkflowBuilderTest.java | 52 ++-
.../falcon/oozie/process/AbstractTestBase.java | 20 +-
.../OozieProcessWorkflowBuilderTest.java | 69 ++--
src/main/examples/entity/hcat/hcat-out-feed.xml | 2 -
.../examples/entity/hcat/hcat-pig-process.xml | 4 +-
39 files changed, 1061 insertions(+), 1134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
index 6f50829..5909113 100644
--- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
+++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java
@@ -672,5 +672,19 @@ public final class EntityUtil {
throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
}
+ public static boolean isTableStorageType(Cluster cluster, Entity entity) throws FalconException {
+ return entity.getEntityType() == EntityType.PROCESS
+ ? isTableStorageType(cluster, (Process) entity) : isTableStorageType(cluster, (Feed) entity);
+ }
+
+ public static boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
+ Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
+ return Storage.TYPE.TABLE == storageType;
+ }
+
+ public static boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
+ Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
+ return Storage.TYPE.TABLE == storageType;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
index 62d95fa..6185aaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieBundleBuilder.java
@@ -122,7 +122,7 @@ public abstract class OozieBundleBuilder<T extends Entity> extends OozieEntityBu
properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true");
properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, "working") + "/lib");
- if (isTableStorageType(cluster)) {
+ if (EntityUtil.isTableStorageType(cluster, entity)) {
properties.putAll(getHiveCredentials(cluster));
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 5f483f0..e354011 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -18,7 +18,6 @@
package org.apache.falcon.oozie;
-import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
@@ -40,10 +39,6 @@ import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.OozieClient;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.InputStream;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
@@ -121,7 +116,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
DEFAULT_BROKER_MSG_TTL.toString());
props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
- props.put("logDir", getStoragePath(new Path(EntityUtil.getBaseStagingPath(cluster, entity), "logs")));
+ props.put("logDir", getLogDirectory(cluster));
props.put(OozieClient.EXTERNAL_ID,
new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
"${coord:nominalTime()}").getId());
@@ -164,18 +159,7 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
public abstract List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException;
protected COORDINATORAPP unmarshal(String template) throws FalconException {
- InputStream resourceAsStream = null;
- try {
- resourceAsStream = OozieCoordinatorBuilder.class.getResourceAsStream(template);
- Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
- @SuppressWarnings("unchecked") JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
- unmarshaller.unmarshal(resourceAsStream);
- return jaxbElement.getValue();
- } catch (JAXBException e) {
- throw new FalconException(e);
- } finally {
- IOUtils.closeQuietly(resourceAsStream);
- }
+ return unmarshal(template, OozieUtils.COORD_JAXB_CONTEXT, COORDINATORAPP.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
index 1238b82..7557e3d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -18,19 +18,16 @@
package org.apache.falcon.oozie;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.hadoop.HadoopClientFactory;
@@ -48,8 +45,12 @@ import org.slf4j.LoggerFactory;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -130,6 +131,14 @@ public abstract class OozieEntityBuilder<T extends Entity> {
try {
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+
+ if (LOG.isDebugEnabled()) {
+ StringWriter writer = new StringWriter();
+ marshaller.marshal(jaxbElement, writer);
+ LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
+ LOG.debug(writer.getBuffer().toString());
+ }
+
FileSystem fs = HadoopClientFactory.get().createFileSystem(
outPath.toUri(), ClusterHelper.getConfiguration(cluster));
OutputStream out = fs.create(outPath);
@@ -138,12 +147,6 @@ public abstract class OozieEntityBuilder<T extends Entity> {
} finally {
out.close();
}
- if (LOG.isDebugEnabled()) {
- StringWriter writer = new StringWriter();
- marshaller.marshal(jaxbElement, writer);
- LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
- LOG.debug(writer.getBuffer().toString());
- }
LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
return outPath;
@@ -152,21 +155,6 @@ public abstract class OozieEntityBuilder<T extends Entity> {
}
}
- protected boolean isTableStorageType(Cluster cluster) throws FalconException {
- return entity.getEntityType() == EntityType.PROCESS
- ? isTableStorageType(cluster, (Process) entity) : isTableStorageType(cluster, (Feed) entity);
- }
-
- protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
- Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
- return Storage.TYPE.TABLE == storageType;
- }
-
- protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
- Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
- return Storage.TYPE.TABLE == storageType;
- }
-
protected Properties getHiveCredentials(Cluster cluster) {
String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
if (metaStoreUrl == null) {
@@ -238,21 +226,6 @@ public abstract class OozieEntityBuilder<T extends Entity> {
return properties;
}
- protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
- String prefix = "falcon_" + input.getName();
-
- propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
- props.put(prefix + "_partition_filter_pig",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
- props.put(prefix + "_partition_filter_hive",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
- props.put(prefix + "_partition_filter_java",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
- props.put(prefix + "_datain_partitions_hive",
- "${coord:dataInPartitions('" + input.getName() + "', 'hive-export')}");
- }
-
protected void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage, Properties props) {
String prefix = "falcon_" + output.getName();
@@ -304,4 +277,23 @@ public abstract class OozieEntityBuilder<T extends Entity> {
prop.setProperty(OozieEntityBuilder.ENTITY_NAME, name);
return prop;
}
+
+ protected String getLogDirectory(Cluster cluster) {
+ return getStoragePath(new Path(EntityUtil.getBaseStagingPath(cluster, entity), "logs"));
+ }
+
+ protected <T> T unmarshal(String template, JAXBContext context, Class<T> cls) throws FalconException {
+ InputStream resourceAsStream = null;
+ try {
+ resourceAsStream = OozieEntityBuilder.class.getResourceAsStream(template);
+ Unmarshaller unmarshaller = context.createUnmarshaller();
+ @SuppressWarnings("unchecked")
+ JAXBElement<T> jaxbElement = unmarshaller.unmarshal(new StreamSource(resourceAsStream), cls);
+ return jaxbElement.getValue();
+ } catch (JAXBException e) {
+ throw new FalconException("Failed to unmarshal " + template, e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index 2ef162b..083f807 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -22,19 +22,24 @@ import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.oozie.feed.FeedReplicationWorkflowBuilder;
+import org.apache.falcon.oozie.feed.FSReplicationWorkflowBuilder;
import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder;
+import org.apache.falcon.oozie.feed.HCatReplicationWorkflowBuilder;
import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CREDENTIAL;
import org.apache.falcon.oozie.workflow.CREDENTIALS;
+import org.apache.falcon.oozie.workflow.END;
+import org.apache.falcon.oozie.workflow.KILL;
+import org.apache.falcon.oozie.workflow.START;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.OozieUtils;
@@ -44,12 +49,8 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashSet;
@@ -63,8 +64,22 @@ import java.util.Set;
*/
public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+
+ protected static final String USER_ACTION_NAME = "user-action";
+ protected static final String PREPROCESS_ACTION_NAME = "pre-processing";
+ protected static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing";
+ protected static final String FAIL_POSTPROCESS_ACTION_NAME = "failed-post-processing";
+ protected static final String OK_ACTION_NAME = "end";
+ protected static final String FAIL_ACTION_NAME = "fail";
+
+
+ private static final String POSTPROCESS_TEMPLATE = "/action/post-process.xml";
+ private static final String PREPROCESS_TEMPLATE = "/action/pre-process.xml";
+
public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
- Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+ Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME,
+ FAIL_POSTPROCESS_ACTION_NAME, }));
+
private final Tag lifecycle;
public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) {
@@ -72,7 +87,8 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
this.lifecycle = lifecycle;
}
- public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Tag lifecycle) {
+ public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
+ throws FalconException {
switch(entity.getEntityType()) {
case FEED:
Feed feed = (Feed) entity;
@@ -81,7 +97,12 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
return new FeedRetentionWorkflowBuilder(feed);
case REPLICATION:
- return new FeedReplicationWorkflowBuilder(feed);
+ boolean isTable = EntityUtil.isTableStorageType(cluster, feed);
+ if (isTable) {
+ return new HCatReplicationWorkflowBuilder(feed);
+ } else {
+ return new FSReplicationWorkflowBuilder(feed);
+ }
default:
throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle "
@@ -110,24 +131,76 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + lifecycle);
}
+ protected void addTransition(ACTION action, String ok, String fail) {
+ action.getOk().setTo(ok);
+ action.getError().setTo(fail);
+ }
+
+ protected void decorateWorkflow(WORKFLOWAPP wf, String name, String startAction) {
+ wf.setName(name);
+ wf.setStart(new START());
+ wf.getStart().setTo(startAction);
+
+ wf.setEnd(new END());
+ wf.getEnd().setName(OK_ACTION_NAME);
+
+ KILL kill = new KILL();
+ kill.setName(FAIL_ACTION_NAME);
+ kill.setMessage("Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]");
+ wf.getDecisionOrForkOrJoin().add(kill);
+ }
+
+ protected ACTION getSuccessPostProcessAction() throws FalconException {
+ ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+ decorateWithOozieRetries(action);
+ return action;
+ }
+
+ protected ACTION getFailPostProcessAction() throws FalconException {
+ ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
+ decorateWithOozieRetries(action);
+ action.setName(FAIL_POSTPROCESS_ACTION_NAME);
+ return action;
+ }
+
+ protected ACTION getPreProcessingAction(boolean isTableStorageType, Tag tag) throws FalconException {
+ ACTION action = unmarshalAction(PREPROCESS_TEMPLATE);
+ decorateWithOozieRetries(action);
+ if (isTableStorageType) {
+ // adds hive-site.xml in actions classpath
+ action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+ }
+
+ List<String> args = action.getJava().getArg();
+ args.add("-out");
+ if (tag == Tag.REPLICATION) {
+ args.add("${logDir}/latedata/${nominalTime}/${srcClusterName}");
+ } else {
+ args.add("${logDir}/latedata/${nominalTime}");
+ }
+ return action;
+ }
+
protected Path marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
return marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
}
protected WORKFLOWAPP unmarshal(String template) throws FalconException {
- InputStream resourceAsStream = null;
- try {
- resourceAsStream = OozieOrchestrationWorkflowBuilder.class.getResourceAsStream(template);
- Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
- @SuppressWarnings("unchecked")
- JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(resourceAsStream);
- return jaxbElement.getValue();
- } catch (JAXBException e) {
- throw new FalconException(e);
- } finally {
- IOUtils.closeQuietly(resourceAsStream);
+ return unmarshal(template, OozieUtils.WORKFLOW_JAXB_CONTEXT, WORKFLOWAPP.class);
+ }
+
+ protected ACTION unmarshalAction(String template) throws FalconException {
+ return unmarshal(template, OozieUtils.ACTION_JAXB_CONTEXT, ACTION.class);
+ }
+
+ protected boolean shouldPreProcess() throws FalconException {
+ if (EntityUtil.getLateProcess(entity) == null
+ || EntityUtil.getLateProcess(entity).getLateInputs() == null
+ || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+ return false;
}
+ return true;
}
protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag)
@@ -281,19 +354,6 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
return property;
}
- protected void addOozieRetries(WORKFLOWAPP workflow) {
- for (Object object : workflow.getDecisionOrForkOrJoin()) {
- if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
- continue;
- }
- org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
- String actionName = action.getName();
- if (FALCON_ACTIONS.contains(actionName)) {
- decorateWithOozieRetries(action);
- }
- }
- }
-
protected void decorateWithOozieRetries(ACTION action) {
Properties props = RuntimeProperties.get();
action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
new file mode 100644
index 0000000..6feb32e
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FSReplicationWorkflowBuilder.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+
+import java.util.Arrays;
+
+/**
+ * Builds replication workflow for filesystem based feed.
+ */
+public class FSReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder{
+ public FSReplicationWorkflowBuilder(Feed entity) {
+ super(entity);
+ }
+
+ @Override protected WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException {
+ WORKFLOWAPP workflow = new WORKFLOWAPP();
+ String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(src.getName()), entity).toString();
+
+ String start = REPLICATION_ACTION_NAME;
+
+ //Add pre-processing
+ if (shouldPreProcess()) {
+ ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
+ addTransition(action, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
+ start = PREPROCESS_ACTION_NAME;
+ }
+
+ //Add replication
+ ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
+ addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(replication);
+
+ //Add post-processing actions
+ ACTION success = getSuccessPostProcessAction();
+ addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(success);
+
+ ACTION fail = getFailPostProcessAction();
+ addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(fail);
+
+ decorateWorkflow(workflow, wfName, start);
+ return workflow;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 0b582ef..2798db2 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -63,6 +63,9 @@ import java.util.Properties;
*/
public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
private static final String REPLICATION_COORD_TEMPLATE = "/coordinator/replication-coordinator.xml";
+ private static final String IMPORT_HQL = "/action/feed/falcon-table-import.hql";
+ private static final String EXPORT_HQL = "/action/feed/falcon-table-export.hql";
+
private static final int THIRTY_MINUTES = 30 * 60 * 1000;
private static final String PARALLEL = "parallel";
@@ -96,8 +99,9 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
// Different workflow for each source since hive credentials vary for each cluster
- OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, Tag.REPLICATION);
- builder.build(trgCluster, buildPath);
+ OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster,
+ Tag.REPLICATION);
+ Properties wfProps = builder.build(trgCluster, buildPath);
long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
@@ -136,7 +140,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
coord.setAction(replicationWorkflowAction);
Path marshalPath = marshal(trgCluster, coord, buildPath);
- return getProperties(marshalPath, coordName);
+ wfProps.putAll(getProperties(marshalPath, coordName));
+ return wfProps;
}
private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
@@ -144,7 +149,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
ACTION action = new ACTION();
WORKFLOW workflow = new WORKFLOW();
- workflow.setAppPath(getStoragePath(new Path(buildPath, "workflow.xml")));
+ workflow.setAppPath(getStoragePath(buildPath));
Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
@@ -266,8 +271,8 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
try {
// copy import export scripts to stagingDir
Path scriptPath = new Path(buildPath, "scripts");
- copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-export.hql");
- copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-import.hql");
+ copyHiveScript(fs, scriptPath, IMPORT_HQL);
+ copyHiveScript(fs, scriptPath, EXPORT_HQL);
// create hive conf to stagingDir
Path confPath = new Path(buildPath + "/conf");
@@ -278,13 +283,12 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
}
}
- private void copyHiveScript(FileSystem fs, Path scriptPath, String localScriptPath,
- String scriptName) throws IOException {
+ private void copyHiveScript(FileSystem fs, Path scriptPath, String resource) throws IOException {
OutputStream out = null;
InputStream in = null;
try {
- out = fs.create(new Path(scriptPath, scriptName));
- in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+ out = fs.create(new Path(scriptPath, new Path(resource).getName()));
+ in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(resource);
IOUtils.copy(in, out);
} finally {
IOUtils.closeQuietly(in);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index 2537725..eafef32 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -34,67 +34,34 @@ import java.util.Properties;
/**
* Builds feed replication workflow, one per source-target cluster combination.
*/
-public class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
- private static final String REPLICATION_WF_TEMPLATE = "/workflow/replication-workflow.xml";
- private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
- private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
+public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+ protected static final String REPLICATION_ACTION_TEMPLATE = "/action/feed/replication-action.xml";
+ protected static final String REPLICATION_ACTION_NAME = "replication";
public FeedReplicationWorkflowBuilder(Feed entity) {
super(entity, Tag.REPLICATION);
}
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
- WORKFLOWAPP workflow = unmarshal(REPLICATION_WF_TEMPLATE);
Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName());
+
+ WORKFLOWAPP workflow = getWorkflow(srcCluster, cluster);
String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
workflow.setName(wfName);
addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
- addOozieRetries(workflow);
-
- if (isTableStorageType(cluster)) {
- setupHiveCredentials(cluster, srcCluster, workflow);
- }
-
- Path marshalPath = marshal(cluster, workflow, buildPath);
- return getProperties(marshalPath, wfName);
+ marshal(cluster, workflow, buildPath);
+ Properties props = getProperties(buildPath, wfName);
+ props.putAll(getWorkflowProperties());
+ return props;
}
- private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster, WORKFLOWAPP workflowApp) {
- if (isSecurityEnabled) {
- // add hcatalog credentials for secure mode and add a reference to each action
- addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
- addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
- }
-
- // hive-site.xml file is created later in coordinator initialization but
- // actions are set to point to that here
-
- for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
- if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
- continue;
- }
-
- org.apache.falcon.oozie.workflow.ACTION action =
- (org.apache.falcon.oozie.workflow.ACTION) object;
- String actionName = action.getName();
- if ("recordsize".equals(actionName)) {
- // add reference to hive-site conf to each action
- action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
-
- if (isSecurityEnabled) { // add a reference to credential in the action
- action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
- }
- } else if ("table-export".equals(actionName)) {
- if (isSecurityEnabled) { // add a reference to credential in the action
- action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
- }
- } else if ("table-import".equals(actionName)) {
- if (isSecurityEnabled) { // add a reference to credential in the action
- action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
- }
- }
- }
+ private Properties getWorkflowProperties() {
+ Properties props = new Properties();
+ props.setProperty("falconDataOperation", "REPLICATE");
+ return props;
}
+
+ protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index ac38532..2238778 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -95,8 +95,10 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
props.putAll(FeedHelper.getUserWorkflowProperties("eviction"));
WORKFLOW workflow = new WORKFLOW();
- Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, Tag.RETENTION).build(cluster, coordPath);
+ Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
+ coordPath);
workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+ props.putAll(wfProp);
workflow.setConfiguration(getConfig(props));
ACTION action = new ACTION();
action.setWorkflow(workflow);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index eee4fe9..3aabb19 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -24,6 +24,7 @@ import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.hadoop.fs.Path;
@@ -33,25 +34,50 @@ import java.util.Properties;
* Builds feed retention workflow.
*/
public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
- private static final String RETENTION_WF_TEMPLATE = "/workflow/retention-workflow.xml";
+ private static final String EVICTION_ACTION_TEMPLATE = "/action/feed/eviction-action.xml";
+
+ private static final String EVICTION_ACTION_NAME = "eviction";
public FeedRetentionWorkflowBuilder(Feed entity) {
super(entity, Tag.DEFAULT);
}
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
- WORKFLOWAPP workflow = unmarshal(RETENTION_WF_TEMPLATE);
+ WORKFLOWAPP workflow = new WORKFLOWAPP();
String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
- workflow.setName(wfName);
+
+ //Add eviction action
+ ACTION eviction = unmarshalAction(EVICTION_ACTION_TEMPLATE);
+ addTransition(eviction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(eviction);
+
+ //Add post-processing actions
+ ACTION success = getSuccessPostProcessAction();
+ addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(success);
+
+ ACTION fail = getFailPostProcessAction();
+ addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(fail);
+
+ decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
- addOozieRetries(workflow);
- if (isTableStorageType(cluster)) {
+ if (EntityUtil.isTableStorageType(cluster, entity)) {
setupHiveCredentials(cluster, buildPath, workflow);
}
- Path marshalPath = marshal(cluster, workflow, buildPath);
- return getProperties(marshalPath, wfName);
+ marshal(cluster, workflow, buildPath);
+ Properties props = getProperties(buildPath, wfName);
+ props.putAll(getWorkflowProperties());
+ return props;
+ }
+
+ private Properties getWorkflowProperties() {
+ Properties props = new Properties();
+ props.setProperty("falconDataOperation", "DELETE");
+ props.setProperty("srcClusterName", "NA");
+ return props;
}
private void setupHiveCredentials(Cluster cluster, Path wfPath,
@@ -72,7 +98,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
org.apache.falcon.oozie.workflow.ACTION action =
(org.apache.falcon.oozie.workflow.ACTION) object;
String actionName = action.getName();
- if ("eviction".equals(actionName)) {
+ if (EVICTION_ACTION_NAME.equals(actionName)) {
// add reference to hive-site conf to each action
action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
new file mode 100644
index 0000000..61739a5
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+
+import java.util.Arrays;
+
+/**
+ * Builds replication workflow for hcat based feed.
+ */
+public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuilder {
+ private static final String EXPORT_ACTION_TEMPLATE = "/action/feed/table-export.xml";
+ private static final String IMPORT_ACTION_TEMPLATE = "/action/feed/table-import.xml";
+ private static final String CLEANUP_ACTION_TEMPLATE = "/action/feed/table-cleanup.xml";
+
+ private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
+ private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
+ public static final String EXPORT_ACTION_NAME = "table-export";
+ public static final String IMPORT_ACTION_NAME = "table-import";
+ private static final String CLEANUP_ACTION_NAME = "cleanup-table-staging-dir";
+
+ public HCatReplicationWorkflowBuilder(Feed entity) {
+ super(entity);
+ }
+
+ @Override protected WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException {
+ WORKFLOWAPP workflow = new WORKFLOWAPP();
+ String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(src.getName()), entity).toString();
+
+ String start = EXPORT_ACTION_NAME;
+
+ //Add pre-processing
+ if (shouldPreProcess()) {
+ ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
+ addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(action);
+ start = PREPROCESS_ACTION_NAME;
+ }
+
+ //Add export action
+ ACTION export = unmarshalAction(EXPORT_ACTION_TEMPLATE);
+ addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(export);
+
+ //Add replication
+ ACTION replication = unmarshalAction(REPLICATION_ACTION_TEMPLATE);
+ addTransition(replication, IMPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(replication);
+
+ //Add import action
+ ACTION importAction = unmarshalAction(IMPORT_ACTION_TEMPLATE);
+ addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(importAction);
+
+ //Add cleanup action
+ ACTION cleanup = unmarshalAction(CLEANUP_ACTION_TEMPLATE);
+ addTransition(cleanup, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(cleanup);
+
+ //Add post-processing actions
+ ACTION success = getSuccessPostProcessAction();
+ addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(success);
+
+ ACTION fail = getFailPostProcessAction();
+ addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+ workflow.getDecisionOrForkOrJoin().add(fail);
+
+ decorateWorkflow(workflow, wfName, start);
+ setupHiveCredentials(src, target, workflow);
+ return workflow;
+ }
+
+ private void setupHiveCredentials(Cluster sourceCluster, Cluster targetCluster, WORKFLOWAPP workflowApp) {
+ if (isSecurityEnabled) {
+ // add hcatalog credentials for secure mode and add a reference to each action
+ addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
+ addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
+ }
+
+ // hive-site.xml file is created later in coordinator initialization but
+ // actions are set to point to that here
+
+ for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+ continue;
+ }
+
+ org.apache.falcon.oozie.workflow.ACTION action =
+ (org.apache.falcon.oozie.workflow.ACTION) object;
+ String actionName = action.getName();
+ if (PREPROCESS_ACTION_NAME.equals(actionName)) {
+ // add reference to hive-site conf to each action
+ action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
+
+ if (isSecurityEnabled) { // add a reference to credential in the action
+ action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+ }
+ } else if (EXPORT_ACTION_NAME.equals(actionName)) {
+ if (isSecurityEnabled) { // add a reference to credential in the action
+ action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+ }
+ } else if (IMPORT_ACTION_NAME.equals(actionName)) {
+ if (isSecurityEnabled) { // add a reference to credential in the action
+ action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
index 79a1883..358475d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
@@ -35,14 +35,14 @@ import java.util.List;
* Builds orchestration workflow for process where engine is hive.
*/
public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+ private static final String ACTION_TEMPLATE = "/action/process/hive-action.xml";
+
public HiveProcessWorkflowBuilder(Process entity) {
super(entity);
}
- @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
- if (!action.getName().equals("user-hive-job")) {
- return;
- }
+ @Override protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
+ ACTION action = unmarshalAction(ACTION_TEMPLATE);
JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(action);
org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
@@ -65,6 +65,7 @@ public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
buildPath));
OozieUtils.marshalHiveAction(action, actionJaxbElement);
+ return action;
}
private void propagateEntityProperties(org.apache.falcon.oozie.hive.ACTION hiveAction) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
index 977d8c1..14668f0 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
@@ -29,15 +29,16 @@ import org.apache.hadoop.fs.Path;
* Builds oozie workflow for process where the engine is oozie.
*/
public class OozieProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+ private static final String ACTION_TEMPLATE = "/action/process/oozie-action.xml";
+
public OozieProcessWorkflowBuilder(Process entity) {
super(entity);
}
- @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
- if (!action.getName().equals("user-oozie-workflow")) {
- return;
- }
+ @Override protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
+ ACTION action = unmarshalAction(ACTION_TEMPLATE);
action.getSubWorkflow().setAppPath(getStoragePath(ProcessHelper.getUserWorkflowPath(entity, cluster,
buildPath)));
+ return action;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
index 29f601d..6a83ddf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
@@ -19,6 +19,7 @@
package org.apache.falcon.oozie.process;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.process.Process;
@@ -34,15 +35,14 @@ import java.util.List;
* Builds orchestration workflow for process where engine is pig.
*/
public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+ private static final String ACTION_TEMPLATE = "/action/process/pig-action.xml";
public PigProcessWorkflowBuilder(Process entity) {
super(entity);
}
- @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
- if (!action.getName().equals("user-pig-job")) {
- return;
- }
+ @Override protected ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException {
+ ACTION action = unmarshalAction(ACTION_TEMPLATE);
PIG pigAction = action.getPig();
Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
@@ -56,12 +56,14 @@ public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
propagateEntityProperties(pigAction.getConfiguration(), pigAction.getParam());
- if (isTableStorageType(cluster)) { // adds hive-site.xml in pig classpath
+ if (EntityUtil.isTableStorageType(cluster, entity)) { // adds hive-site.xml in pig classpath
pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
}
addArchiveForCustomJars(cluster, pigAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
buildPath));
+
+ return action;
}
private void addPrepareDeleteOutputPath(PIG pigAction) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index e46ae6e..e907087 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -89,10 +89,12 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
propagateUserWorkflowProperties(processWorkflow, props);
// create parent wf
- Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, Tag.DEFAULT).build(cluster, coordPath);
+ Properties wfProps = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.DEFAULT).build(cluster,
+ coordPath);
WORKFLOW wf = new WORKFLOW();
wf.setAppPath(getStoragePath(wfProps.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+ props.putAll(wfProps);
wf.setConfiguration(getConfig(props));
// set coord action to parent wf
@@ -333,4 +335,16 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
props.put("userWorkflowEngine", processWorkflow.getEngine().value());
}
+ protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
+ String prefix = "falcon_" + input.getName();
+
+ propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+ props.put(prefix + "_partition_filter_pig",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
+ props.put(prefix + "_partition_filter_hive",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
+ props.put(prefix + "_partition_filter_java",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 2e3a5c1..2eae7ca 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -53,50 +53,66 @@ import java.util.Set;
* Base class for building orchestration workflow for process.
*/
public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Process> {
- private static final String DEFAULT_WF_TEMPLATE = "/workflow/process-parent-workflow.xml";
+
private static final Set<String> FALCON_PROCESS_HIVE_ACTIONS = new HashSet<String>(
- Arrays.asList(new String[]{"recordsize", "user-oozie-workflow", "user-pig-job", "user-hive-job", }));
+ Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, USER_ACTION_NAME, }));
protected ProcessExecutionWorkflowBuilder(Process entity) {
super(entity, Tag.DEFAULT);
}
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
- WORKFLOWAPP wfApp = unmarshal(DEFAULT_WF_TEMPLATE);
+ WORKFLOWAPP wfApp = new WORKFLOWAPP();
String wfName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
- wfApp.setName(wfName);
+
+ String startAction = USER_ACTION_NAME;
+ final boolean isTableStorageType = EntityUtil.isTableStorageType(cluster, entity);
+
+ //Add pre-processing action
+ if (shouldPreProcess()) {
+ ACTION preProcessAction = getPreProcessingAction(isTableStorageType, Tag.DEFAULT);
+ addTransition(preProcessAction, USER_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ wfApp.getDecisionOrForkOrJoin().add(preProcessAction);
+ startAction = PREPROCESS_ACTION_NAME;
+ }
+
+ //Add user action
+ ACTION userAction = getUserAction(cluster, buildPath);
+ addTransition(userAction, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
+ wfApp.getDecisionOrForkOrJoin().add(userAction);
+
+ //Add post-processing
+ ACTION success = getSuccessPostProcessAction();
+ addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
+ wfApp.getDecisionOrForkOrJoin().add(success);
+
+ ACTION fail = getFailPostProcessAction();
+ addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
+ wfApp.getDecisionOrForkOrJoin().add(fail);
+
+ decorateWorkflow(wfApp, wfName, startAction);
addLibExtensionsToWorkflow(cluster, wfApp, null);
- final boolean isTableStorageType = isTableStorageType(cluster);
if (isTableStorageType) {
setupHiveCredentials(cluster, buildPath, wfApp);
}
- for (Object object : wfApp.getDecisionOrForkOrJoin()) {
- if (!(object instanceof ACTION)) {
- continue;
- }
+ marshal(cluster, wfApp, buildPath);
+ Properties props = getProperties(buildPath, wfName);
+ props.putAll(getWorkflowProperties(cluster));
- ACTION action = (ACTION) object;
- String actionName = action.getName();
- if (FALCON_ACTIONS.contains(actionName)) {
- decorateWithOozieRetries(action);
- if (isTableStorageType && actionName.equals("recordsize")) {
- // adds hive-site.xml in actions classpath
- action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
- }
- }
-
- decorateAction(action, cluster, buildPath);
- }
+ return props;
+ }
- //Create parent workflow
- Path marshalPath = marshal(cluster, wfApp, buildPath);
- return getProperties(marshalPath, wfName);
+ private Properties getWorkflowProperties(Cluster cluster) {
+ Properties props = new Properties();
+ props.setProperty("falconDataOperation", "GENERATE");
+ props.setProperty("srcClusterName", "NA");
+ return props;
}
- protected abstract void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException;
+ protected abstract ACTION getUserAction(Cluster cluster, Path buildPath) throws FalconException;
private void setupHiveCredentials(Cluster cluster, Path buildPath, WORKFLOWAPP wfApp) throws FalconException {
// create hive-site.xml file so actions can use it in the classpath
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 9e1c82d..0ae229c 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -40,6 +40,7 @@ import java.util.Properties;
*/
public final class OozieUtils {
public static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+ public static final JAXBContext ACTION_JAXB_CONTEXT;
public static final JAXBContext COORD_JAXB_CONTEXT;
public static final JAXBContext BUNDLE_JAXB_CONTEXT;
protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
@@ -47,6 +48,7 @@ public final class OozieUtils {
static {
try {
WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+ ACTION_JAXB_CONTEXT = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.ACTION.class);
COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/eviction-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/eviction-action.xml b/oozie/src/main/resources/action/feed/eviction-action.xml
new file mode 100644
index 0000000..6d03eb0
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/eviction-action.xml
@@ -0,0 +1,55 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<action name="eviction" xmlns="uri:oozie:workflow:0.3">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <!-- HCatalog jars -->
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>hcatalog</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.retention.FeedEvictor</main-class>
+ <arg>-feedBasePath</arg>
+ <arg>${feedDataPath}</arg>
+ <arg>-falconFeedStorageType</arg>
+ <arg>${falconFeedStorageType}</arg>
+ <arg>-retentionType</arg>
+ <arg>instance</arg>
+ <arg>-retentionLimit</arg>
+ <arg>${limit}</arg>
+ <arg>-timeZone</arg>
+ <arg>${timeZone}</arg>
+ <arg>-frequency</arg>
+ <arg>${frequency}</arg>
+ <arg>-logFile</arg>
+ <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+ </java>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/falcon-table-export.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/falcon-table-export.hql b/oozie/src/main/resources/action/feed/falcon-table-export.hql
new file mode 100644
index 0000000..37fd1b7
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/falcon-table-export.hql
@@ -0,0 +1,18 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+export table ${falconSourceDatabase}.${falconSourceTable} partition ${falconSourcePartition} to '${falconSourceStagingDir}';
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/falcon-table-import.hql
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/falcon-table-import.hql b/oozie/src/main/resources/action/feed/falcon-table-import.hql
new file mode 100644
index 0000000..653d580
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/falcon-table-import.hql
@@ -0,0 +1,20 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements. See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership. The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+use ${falconTargetDatabase};
+alter table ${falconTargetTable} drop if exists partition ${falconTargetPartition};
+import table ${falconTargetTable} partition ${falconTargetPartition} from '${falconTargetStagingDir}';
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/replication-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/replication-action.xml b/oozie/src/main/resources/action/feed/replication-action.xml
new file mode 100644
index 0000000..9da0396
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/replication-action.xml
@@ -0,0 +1,59 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!-- Replication action -->
+<action name="replication" xmlns="uri:oozie:workflow:0.3">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property> <!-- hadoop 1 parameter -->
+ <name>oozie.launcher.mapreduce.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
+ <arg>-Dfalcon.include.path=${sourceRelativePaths}</arg>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-maxMaps</arg>
+ <arg>${maxMaps}</arg>
+ <arg>-mapBandwidthKB</arg>
+ <arg>${mapBandwidthKB}</arg>
+ <arg>-sourcePaths</arg>
+ <arg>${distcpSourcePaths}</arg>
+ <arg>-targetPath</arg>
+ <arg>${distcpTargetPaths}</arg>
+ <arg>-falconFeedStorageType</arg>
+ <arg>${falconFeedStorageType}</arg>
+ <file>${wf:conf("falcon.libpath")}/hadoop-distcp.jar</file>
+ </java>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/table-cleanup.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/table-cleanup.xml b/oozie/src/main/resources/action/feed/table-cleanup.xml
new file mode 100644
index 0000000..23e8df1
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/table-cleanup.xml
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<action name="cleanup-table-staging-dir" xmlns='uri:oozie:workflow:0.3'>
+ <fs>
+ <delete path="${distcpSourcePaths}"/>
+ <delete path="${distcpTargetPaths}"/>
+ </fs>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/table-export.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/table-export.xml b/oozie/src/main/resources/action/feed/table-export.xml
new file mode 100644
index 0000000..f5f7e66
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/table-export.xml
@@ -0,0 +1,45 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!-- Table Replication - Export data and metadata to HDFS Staging from Source Hive -->
+<action name="table-export" xmlns='uri:oozie:workflow:0.3'>
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${falconSourceJobTracker}</job-tracker>
+ <name-node>${falconSourceNameNode}</name-node>
+ <prepare>
+ <delete path="${distcpSourcePaths}"/>
+ </prepare>
+ <job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <script>${wf:appPath()}/scripts/falcon-table-export.hql</script>
+ <param>falconSourceDatabase=${falconSourceDatabase}</param>
+ <param>falconSourceTable=${falconSourceTable}</param>
+ <param>falconSourcePartition=${falconSourcePartition}</param>
+ <param>falconSourceStagingDir=${distcpSourcePaths}</param>
+ </hive>
+ <ok to="replication"/>
+ <error to="failed-post-processing"/>
+</action>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/feed/table-import.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/table-import.xml b/oozie/src/main/resources/action/feed/table-import.xml
new file mode 100644
index 0000000..6e9a073
--- /dev/null
+++ b/oozie/src/main/resources/action/feed/table-import.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<!-- Table Replication - Import data and metadata from HDFS Staging into Target Hive -->
+<action name="table-import" xmlns='uri:oozie:workflow:0.3'>
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${falconTargetJobTracker}</job-tracker>
+ <name-node>${falconTargetNameNode}</name-node>
+ <job-xml>${wf:appPath()}/conf/falcon-target-hive-site.xml</job-xml>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <script>${wf:appPath()}/scripts/falcon-table-import.hql</script>
+ <param>falconTargetDatabase=${falconTargetDatabase}</param>
+ <param>falconTargetTable=${falconTargetTable}</param>
+ <param>falconTargetPartition=${falconTargetPartition}</param>
+ <param>falconTargetStagingDir=${distcpTargetPaths}</param>
+ </hive>
+ <ok to="cleanup-table-staging-dir"/>
+ <error to="failed-post-processing"/>
+</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
new file mode 100644
index 0000000..1631d63
--- /dev/null
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -0,0 +1,94 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<action name='succeeded-post-processing' xmlns="uri:oozie:workflow:0.3">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.workflow.FalconPostProcessing</main-class>
+ <arg>-cluster</arg>
+ <arg>${cluster}</arg>
+ <arg>-entityType</arg>
+ <arg>${entityType}</arg>
+ <arg>-entityName</arg>
+ <arg>${entityName}</arg>
+ <arg>-nominalTime</arg>
+ <arg>${nominalTime}</arg>
+ <arg>-operation</arg>
+ <arg>${falconDataOperation}</arg>
+ <arg>-workflowId</arg>
+ <arg>${wf:id()}</arg>
+ <arg>-runId</arg>
+ <arg>${wf:run()}</arg>
+ <arg>-status</arg>
+ <arg>${wf:lastErrorNode() == '' ? 'SUCCEEDED' : 'FAILED'}</arg>
+ <arg>-timeStamp</arg>
+ <arg>${timeStamp}</arg>
+ <arg>-brokerImplClass</arg>
+ <arg>${wf:conf("broker.impl.class")}</arg>
+ <arg>-brokerUrl</arg>
+ <arg>${wf:conf("broker.url")}</arg>
+ <arg>-userBrokerImplClass</arg>
+ <arg>${userBrokerImplClass}</arg>
+ <arg>-userBrokerUrl</arg>
+ <arg>${userBrokerUrl}</arg>
+ <arg>-brokerTTL</arg>
+ <arg>${wf:conf("broker.ttlInMins")}</arg>
+ <arg>-feedNames</arg>
+ <arg>${feedNames}</arg>
+ <arg>-feedInstancePaths</arg>
+ <arg>${feedInstancePaths}</arg>
+ <arg>-logFile</arg>
+ <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+ <arg>-workflowEngineUrl</arg>
+ <arg>${workflowEngineUrl}</arg>
+ <arg>-subflowId</arg>
+ <arg>${wf:id()}${userWorkflowEngine == "oozie" ? "@user-action" : ""}</arg>
+ <arg>-userWorkflowEngine</arg>
+ <arg>${userWorkflowEngine}</arg>
+ <arg>-userWorkflowName</arg>
+ <arg>${userWorkflowName}</arg>
+ <arg>-userWorkflowVersion</arg>
+ <arg>${userWorkflowVersion}</arg>
+ <arg>-logDir</arg>
+ <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}/</arg>
+ <arg>-workflowUser</arg>
+ <arg>${wf:user()}</arg>
+ <arg>-falconInputFeeds</arg>
+ <arg>${falconInputFeeds}</arg>
+ <arg>-falconInputPaths</arg>
+ <arg>${falconInPaths}</arg>
+ <file>${wf:conf("falcon.libpath")}/activemq-core.jar</file>
+ <file>${wf:conf("falcon.libpath")}/geronimo-j2ee-management.jar</file>
+ <file>${wf:conf("falcon.libpath")}/jms.jar</file>
+ <file>${wf:conf("falcon.libpath")}/json-simple.jar</file>
+ <file>${wf:conf("falcon.libpath")}/oozie-client.jar</file>
+ <file>${wf:conf("falcon.libpath")}/spring-jms.jar</file>
+ </java>
+ <ok to="end"/>
+ <error to="fail"/>
+</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/pre-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/pre-process.xml b/oozie/src/main/resources/action/pre-process.xml
new file mode 100644
index 0000000..127ab80
--- /dev/null
+++ b/oozie/src/main/resources/action/pre-process.xml
@@ -0,0 +1,50 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<action name='pre-processing' xmlns="uri:oozie:workflow:0.3">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <!-- HCatalog jars -->
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>hcatalog</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
+ <arg>-out</arg>
+ <arg>${logDir}/latedata/${nominalTime}/${srcClusterName}</arg>
+ <arg>-paths</arg>
+ <arg>${falconInPaths}</arg>
+ <arg>-falconInputFeeds</arg>
+ <arg>${falconInputFeeds}</arg>
+ <arg>-falconInputFeedStorageTypes</arg>
+ <arg>${falconInputFeedStorageTypes}</arg>
+ <capture-output/>
+ </java>
+ <ok to="user-action"/>
+ <error to="failed-post-processing"/>
+</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/process/hive-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/process/hive-action.xml b/oozie/src/main/resources/action/process/hive-action.xml
new file mode 100644
index 0000000..0e20557
--- /dev/null
+++ b/oozie/src/main/resources/action/process/hive-action.xml
@@ -0,0 +1,37 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<action name="user-action" xmlns="uri:oozie:workflow:0.3">
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <job-xml>${wf:appPath()}/conf/hive-site.xml</job-xml>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <script>#USER_WF_PATH#</script>
+ </hive>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3bb5a62a/oozie/src/main/resources/action/process/oozie-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/process/oozie-action.xml b/oozie/src/main/resources/action/process/oozie-action.xml
new file mode 100644
index 0000000..b6fdec1
--- /dev/null
+++ b/oozie/src/main/resources/action/process/oozie-action.xml
@@ -0,0 +1,25 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<action name='user-action' xmlns="uri:oozie:workflow:0.3">
+ <sub-workflow>
+ <app-path>#USER_WF_PATH#</app-path>
+ <propagate-configuration/>
+ </sub-workflow>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+</action>