You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/07/10 08:57:36 UTC
[7/9] FALCON-369 Refactor workflow builder. Contributed by Shwetha GS
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
new file mode 100644
index 0000000..bb8dfcc
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.feed.FeedBundleBuilder;
+import org.apache.falcon.oozie.process.ProcessBundleBuilder;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.service.FalconPathFilter;
+import org.apache.falcon.service.SharedLibraryHostingService;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Marshaller;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+/**
+ * Base class for building oozie entities - workflow, coordinator and bundle.
+ * @param <T>
+ */
+public abstract class OozieEntityBuilder<T extends Entity> {
+ public static final Logger LOG = LoggerFactory.getLogger(OozieEntityBuilder.class);
+
+ public static final String METASTOREURIS = "hive.metastore.uris";
+ public static final String METASTORE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal";
+ public static final String METASTORE_USE_THRIFT_SASL = "hive.metastore.sasl.enabled";
+
+ public static final String ENTITY_PATH = "ENTITY_PATH";
+ public static final String ENTITY_NAME = "ENTITY_NAME";
+
+ private static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("falcon");
+ }
+
+ @Override
+ public String getJarName(Path path) {
+ String name = path.getName();
+ if (name.endsWith(".jar")) {
+ name = name.substring(0, name.indexOf(".jar"));
+ }
+ return name;
+ }
+ };
+
+ protected T entity;
+ protected final boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled();
+
+ public OozieEntityBuilder(T entity) {
+ this.entity = entity;
+ }
+
+ public abstract Properties build(Cluster cluster, Path buildPath) throws FalconException;
+
+ protected String getStoragePath(Path path) {
+ if (path != null) {
+ return getStoragePath(path.toString());
+ }
+ return null;
+ }
+
+ protected String getStoragePath(String path) {
+ if (StringUtils.isNotEmpty(path)) {
+ if (new Path(path).toUri().getScheme() == null && !path.startsWith("${nameNode}")) {
+ path = "${nameNode}" + path;
+ }
+ }
+ return path;
+ }
+
+ public static OozieEntityBuilder get(Entity entity) {
+ switch(entity.getEntityType()) {
+ case FEED:
+ return new FeedBundleBuilder((Feed) entity);
+
+ case PROCESS:
+ return new ProcessBundleBuilder((Process)entity);
+
+ default:
+ }
+ throw new IllegalArgumentException("Unhandled type: " + entity.getEntityType());
+ }
+
+ protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
+ throws FalconException {
+ try {
+ Marshaller marshaller = jaxbContext.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(
+ outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+ OutputStream out = fs.create(outPath);
+ try {
+ marshaller.marshal(jaxbElement, out);
+ } finally {
+ out.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ StringWriter writer = new StringWriter();
+ marshaller.marshal(jaxbElement, writer);
+ LOG.debug("Writing definition to {} on cluster {}", outPath, cluster.getName());
+ LOG.debug(writer.getBuffer().toString());
+ }
+
+ LOG.info("Marshalled {} to {}", jaxbElement.getDeclaredType(), outPath);
+ } catch (Exception e) {
+ throw new FalconException("Unable to marshall app object", e);
+ }
+ }
+
+ protected boolean isTableStorageType(Cluster cluster) throws FalconException {
+ return entity.getEntityType() == EntityType.PROCESS
+ ? isTableStorageType(cluster, (Process) entity) : isTableStorageType(cluster, (Feed) entity);
+ }
+
+ protected boolean isTableStorageType(Cluster cluster, Feed feed) throws FalconException {
+ Storage.TYPE storageType = FeedHelper.getStorageType(feed, cluster);
+ return Storage.TYPE.TABLE == storageType;
+ }
+
+ protected boolean isTableStorageType(Cluster cluster, Process process) throws FalconException {
+ Storage.TYPE storageType = ProcessHelper.getStorageType(cluster, process);
+ return Storage.TYPE.TABLE == storageType;
+ }
+
+ protected Properties getHiveCredentials(Cluster cluster) {
+ String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+ if (metaStoreUrl == null) {
+ throw new IllegalStateException(
+ "Registry interface is not defined in cluster: " + cluster.getName());
+ }
+
+ Properties hiveCredentials = new Properties();
+ hiveCredentials.put(METASTOREURIS, metaStoreUrl);
+ hiveCredentials.put("hive.metastore.execute.setugi", "true");
+ hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat"));
+ hiveCredentials.put("hcat.metastore.uri", metaStoreUrl);
+
+ if (isSecurityEnabled) {
+ String principal = ClusterHelper
+ .getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL);
+ hiveCredentials.put(METASTORE_KERBEROS_PRINCIPAL, principal);
+ hiveCredentials.put(METASTORE_USE_THRIFT_SASL, "true");
+ hiveCredentials.put("hcat.metastore.principal", principal);
+ }
+
+ return hiveCredentials;
+ }
+
+ protected Configuration getHiveCredentialsAsConf(Cluster cluster) {
+ Properties hiveCredentials = getHiveCredentials(cluster);
+
+ Configuration hiveConf = new Configuration(false);
+ for (Entry<Object, Object> entry : hiveCredentials.entrySet()) {
+ hiveConf.set((String)entry.getKey(), (String)entry.getValue());
+ }
+
+ return hiveConf;
+ }
+
+ protected Properties getEntityProperties(Entity myEntity) {
+ Properties properties = new Properties();
+ switch (myEntity.getEntityType()) {
+ case CLUSTER:
+ org.apache.falcon.entity.v0.cluster.Properties clusterProps = ((Cluster) myEntity).getProperties();
+ if (clusterProps != null) {
+ for (Property prop : clusterProps.getProperties()) {
+ properties.put(prop.getName(), prop.getValue());
+ }
+ }
+ break;
+
+ case FEED:
+ org.apache.falcon.entity.v0.feed.Properties feedProps = ((Feed) myEntity).getProperties();
+ if (feedProps != null) {
+ for (org.apache.falcon.entity.v0.feed.Property prop : feedProps.getProperties()) {
+ properties.put(prop.getName(), prop.getValue());
+ }
+ }
+ break;
+
+ case PROCESS:
+ org.apache.falcon.entity.v0.process.Properties processProps = ((Process) myEntity).getProperties();
+ if (processProps != null) {
+ for (org.apache.falcon.entity.v0.process.Property prop : processProps.getProperties()) {
+ properties.put(prop.getName(), prop.getValue());
+ }
+ }
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unhandled entity type " + myEntity.getEntityType());
+ }
+ return properties;
+ }
+
+ protected void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage, Properties props) {
+ String prefix = "falcon_" + input.getName();
+
+ propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+ props.put(prefix + "_partition_filter_pig",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
+ props.put(prefix + "_partition_filter_hive",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
+ props.put(prefix + "_partition_filter_java",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+ props.put(prefix + "_datain_partitions_hive",
+ "${coord:dataInPartitions('" + input.getName() + "', 'hive-export')}");
+ }
+
+ protected void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage, Properties props) {
+ String prefix = "falcon_" + output.getName();
+
+ propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+ //pig and java actions require partition expression as "key1=val1, key2=val2"
+ props.put(prefix + "_partitions_pig",
+ "${coord:dataOutPartitions('" + output.getName() + "')}");
+ props.put(prefix + "_partitions_java",
+ "${coord:dataOutPartitions('" + output.getName() + "')}");
+
+ //hive requires partition expression as "key1='val1', key2='val2'" (with quotes around values)
+ //there is no direct EL expression in oozie
+ List<String> partitions = new ArrayList<String>();
+ for (String key : tableStorage.getDatedPartitionKeys()) {
+ StringBuilder expr = new StringBuilder();
+ expr.append("${coord:dataOutPartitionValue('").append(output.getName()).append("', '").append(key)
+ .append("')}");
+ props.put(prefix + "_dated_partition_value_" + key, expr.toString());
+ partitions.add(key + "='" + expr + "'");
+
+ }
+ props.put(prefix + "_partitions_hive", StringUtils.join(partitions, ","));
+ }
+
+ protected void propagateCommonCatalogTableProperties(CatalogStorage tableStorage, Properties props, String prefix) {
+ props.put(prefix + "_storage_type", tableStorage.getType().name());
+ props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+ props.put(prefix + "_database", tableStorage.getDatabase());
+ props.put(prefix + "_table", tableStorage.getTable());
+ }
+
+ protected void copySharedLibs(Cluster cluster, Path libPath) throws FalconException {
+ try {
+ SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
+ libPath, cluster, FALCON_JAR_FILTER);
+ } catch (IOException e) {
+ throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
+ }
+ }
+
+ protected Properties getProperties(Path path, String name) {
+ if (path == null) {
+ return null;
+ }
+
+ Properties prop = new Properties();
+ prop.setProperty(OozieEntityBuilder.ENTITY_PATH, getStoragePath(path));
+ prop.setProperty(OozieEntityBuilder.ENTITY_NAME, name);
+ return prop;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
new file mode 100644
index 0000000..ac78297
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.feed.FeedReplicationWorkflowBuilder;
+import org.apache.falcon.oozie.feed.FeedRetentionWorkflowBuilder;
+import org.apache.falcon.oozie.process.HiveProcessWorkflowBuilder;
+import org.apache.falcon.oozie.process.OozieProcessWorkflowBuilder;
+import org.apache.falcon.oozie.process.PigProcessWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CREDENTIAL;
+import org.apache.falcon.oozie.workflow.CREDENTIALS;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Base class for building orchestration workflow in oozie.
+ * @param <T>
+ */
+public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
+ protected static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";
+ public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
+ Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+ private final Tag lifecycle;
+
+ public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) {
+ super(entity);
+ this.lifecycle = lifecycle;
+ }
+
+ public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Tag lifecycle) {
+ switch(entity.getEntityType()) {
+ case FEED:
+ Feed feed = (Feed) entity;
+ switch (lifecycle) {
+ case RETENTION:
+ return new FeedRetentionWorkflowBuilder(feed);
+
+ case REPLICATION:
+ return new FeedReplicationWorkflowBuilder(feed);
+
+ default:
+ throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle "
+ + lifecycle);
+ }
+
+ case PROCESS:
+ Process process = (Process) entity;
+ switch(process.getWorkflow().getEngine()) {
+ case PIG:
+ return new PigProcessWorkflowBuilder(process);
+
+ case OOZIE:
+ return new OozieProcessWorkflowBuilder(process);
+
+ case HIVE:
+ return new HiveProcessWorkflowBuilder(process);
+
+ default:
+ break;
+ }
+
+ default:
+ }
+
+ throw new IllegalArgumentException("Unhandled type " + entity.getEntityType() + ", lifecycle " + lifecycle);
+ }
+
+ protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+ marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+ OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+ }
+
+ protected WORKFLOWAPP getWorkflow(String template) throws FalconException {
+ InputStream resourceAsStream = null;
+ try {
+ resourceAsStream = OozieOrchestrationWorkflowBuilder.class.getResourceAsStream(template);
+ Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
+ @SuppressWarnings("unchecked")
+ JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(resourceAsStream);
+ return jaxbElement.getValue();
+ } catch (JAXBException e) {
+ throw new FalconException(e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
+ }
+ }
+
+ protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag)
+ throws FalconException {
+ String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+ try {
+ addExtensionJars(fs, new Path(libext), wf);
+ addExtensionJars(fs, new Path(libext, entity.getEntityType().name()), wf);
+ if (tag != null) {
+ addExtensionJars(fs,
+ new Path(libext, entity.getEntityType().name() + "/" + tag.name().toLowerCase()), wf);
+ }
+ } catch(IOException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+ FileStatus[] libs = null;
+ try {
+ libs = fs.listStatus(path);
+ } catch(FileNotFoundException ignore) {
+ //Ok if the libext is not configured
+ }
+
+ if (libs == null) {
+ return;
+ }
+
+ for(FileStatus lib : libs) {
+ if (lib.isDir()) {
+ continue;
+ }
+
+ for(Object obj: wf.getDecisionOrForkOrJoin()) {
+ if (!(obj instanceof ACTION)) {
+ continue;
+ }
+ ACTION action = (ACTION) obj;
+ List<String> files = null;
+ if (action.getJava() != null) {
+ files = action.getJava().getFile();
+ } else if (action.getPig() != null) {
+ files = action.getPig().getFile();
+ } else if (action.getMapReduce() != null) {
+ files = action.getMapReduce().getFile();
+ }
+ if (files != null) {
+ files.add(lib.getPath().toString());
+ }
+ }
+ }
+ }
+
+ // creates hive-site.xml configuration in conf dir for the given cluster on the same cluster.
+ protected void createHiveConfiguration(Cluster cluster, Path workflowPath, String prefix) throws FalconException {
+ Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+
+ try {
+ Configuration conf = ClusterHelper.getConfiguration(cluster);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+
+ // create hive conf to stagingDir
+ Path confPath = new Path(workflowPath + "/conf");
+
+ persistHiveConfiguration(fs, confPath, hiveConf, prefix);
+ } catch (IOException e) {
+ throw new FalconException("Unable to create create hive site", e);
+ }
+ }
+
+ private void persistHiveConfiguration(FileSystem fs, Path confPath, Configuration hiveConf,
+ String prefix) throws IOException {
+ OutputStream out = null;
+ try {
+ out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+ hiveConf.writeXml(out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ /**
+ * This is only necessary if table is involved and is secure mode.
+ *
+ * @param workflowApp workflow xml
+ * @param cluster cluster entity
+ */
+ protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster, String credentialName) {
+ CREDENTIALS credentials = workflowApp.getCredentials();
+ if (credentials == null) {
+ credentials = new CREDENTIALS();
+ }
+
+ credentials.getCredential().add(createHCatalogCredential(cluster, credentialName));
+
+ // add credential for workflow
+ workflowApp.setCredentials(credentials);
+ }
+
+ /**
+ * This is only necessary if table is involved and is secure mode.
+ *
+ * @param workflowApp workflow xml
+ * @param cluster cluster entity
+ */
+ protected void addHCatalogCredentials(WORKFLOWAPP workflowApp, Cluster cluster,
+ String credentialName, Set<String> actions) {
+ addHCatalogCredentials(workflowApp, cluster, credentialName);
+
+ // add credential to each action
+ for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof ACTION)) {
+ continue;
+ }
+
+ ACTION action = (ACTION) object;
+ String actionName = action.getName();
+ if (actions.contains(actionName)) {
+ action.setCred(credentialName);
+ }
+ }
+ }
+
+ /**
+ * This is only necessary if table is involved and is secure mode.
+ *
+ * @param cluster cluster entity
+ * @param credentialName credential name
+ * @return CREDENTIALS object
+ */
+ private CREDENTIAL createHCatalogCredential(Cluster cluster, String credentialName) {
+ final String metaStoreUrl = ClusterHelper.getRegistryEndPoint(cluster);
+
+ CREDENTIAL credential = new CREDENTIAL();
+ credential.setName(credentialName);
+ credential.setType("hcat");
+
+ credential.getProperty().add(createProperty("hcat.metastore.uri", metaStoreUrl));
+ credential.getProperty().add(createProperty("hcat.metastore.principal",
+ ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL)));
+
+ return credential;
+ }
+
+ private CREDENTIAL.Property createProperty(String name, String value) {
+ CREDENTIAL.Property property = new CREDENTIAL.Property();
+ property.setName(name);
+ property.setValue(value);
+ return property;
+ }
+
+ protected void addOozieRetries(WORKFLOWAPP workflow) {
+ for (Object object : workflow.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+ continue;
+ }
+ org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
+ String actionName = action.getName();
+ if (FALCON_ACTIONS.contains(actionName)) {
+ decorateWithOozieRetries(action);
+ }
+ }
+ }
+
+ protected void decorateWithOozieRetries(ACTION action) {
+ Properties props = RuntimeProperties.get();
+ action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+ action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
new file mode 100644
index 0000000..6917f4e
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedBundleBuilder.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.OozieBundleBuilder;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.hadoop.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds oozie bundle for the feed.
+ */
+public class FeedBundleBuilder extends OozieBundleBuilder<Feed> {
+ public FeedBundleBuilder(Feed entity) {
+ super(entity);
+ }
+
+ @Override protected Path getLibPath(Cluster cluster, Path buildPath) {
+ return new Path(buildPath, "lib");
+ }
+
+ @Override protected List<Properties> doBuild(Cluster cluster, Path buildPath) throws FalconException {
+ List<Properties> props = new ArrayList<Properties>();
+ List<Properties> evictionProps =
+ OozieCoordinatorBuilder.get(entity, Tag.RETENTION).buildCoords(cluster, buildPath);
+ if (evictionProps != null) {
+ props.addAll(evictionProps);
+ }
+
+ List<Properties> replicationProps = OozieCoordinatorBuilder.get(entity, Tag.REPLICATION).buildCoords(cluster,
+ buildPath);
+ if (replicationProps != null) {
+ props.addAll(replicationProps);
+ }
+
+ if (!props.isEmpty()) {
+ copySharedLibs(cluster, getLibPath(cluster, buildPath));
+ }
+
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
new file mode 100644
index 0000000..3226cf2
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds oozie coordinator for feed replication, one per source-target cluster combination.
+ */
+public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
+ private static final String REPLICATION_COORD_TEMPLATE = "/coordinator/replication-coordinator.xml";
+ private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+ private static final String PARALLEL = "parallel";
+ private static final String TIMEOUT = "timeout";
+ private static final String MR_MAX_MAPS = "maxMaps";
+ private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB";
+
+ public FeedReplicationCoordinatorBuilder(Feed entity) {
+ super(entity, Tag.REPLICATION);
+ }
+
+ @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+ if (feedCluster.getType() == ClusterType.TARGET) {
+ List<Properties> props = new ArrayList<Properties>();
+ OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, Tag.REPLICATION);
+ for (org.apache.falcon.entity.v0.feed.Cluster srcFeedCluster : entity.getClusters().getClusters()) {
+
+ if (srcFeedCluster.getType() == ClusterType.SOURCE) {
+ Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, srcFeedCluster.getName());
+ // workflow is serialized to a specific dir
+ Path coordPath = new Path(buildPath, Tag.REPLICATION.name() + "/" + srcCluster.getName());
+
+ // Different workflow for each source since hive credentials vary for each cluster
+ builder.build(cluster, coordPath);
+
+ props.add(doBuild(srcCluster, cluster, coordPath));
+ }
+ }
+ return props;
+ }
+ return null;
+ }
+
+ private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
+ long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
+ Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
+ Date sourceEndDate = getEndDate(srcCluster);
+
+ Date targetStartDate = getStartDate(trgCluster, replicationDelayInMillis);
+ Date targetEndDate = getEndDate(trgCluster);
+
+ if (noOverlapExists(sourceStartDate, sourceEndDate,
+ targetStartDate, targetEndDate)) {
+ LOG.warn("Not creating replication coordinator, as the source cluster: {} and target cluster: {} do "
+ + "not have overlapping dates", srcCluster.getName(), trgCluster.getName());
+ return null;
+ }
+
+ COORDINATORAPP coord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+
+ String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()),
+ entity).toString();
+ String start = sourceStartDate.after(targetStartDate)
+ ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
+ String end = sourceEndDate.before(targetEndDate)
+ ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
+
+ initializeCoordAttributes(coord, coordName, start, end, replicationDelayInMillis);
+ setCoordControls(coord);
+
+ final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, entity);
+ initializeInputDataSet(srcCluster, coord, sourceStorage);
+
+ final Storage targetStorage = FeedHelper.createStorage(trgCluster, entity);
+ initializeOutputDataSet(trgCluster, coord, targetStorage);
+
+ ACTION replicationWorkflowAction = getReplicationWorkflowAction(
+ srcCluster, trgCluster, buildPath, coordName, sourceStorage, targetStorage);
+ coord.setAction(replicationWorkflowAction);
+
+ marshal(trgCluster, coord, buildPath);
+ return getProperties(buildPath, coordName);
+ }
+
+ private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
+ String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
+ ACTION action = new ACTION();
+ WORKFLOW workflow = new WORKFLOW();
+
+ workflow.setAppPath(getStoragePath(buildPath.toString()));
+ Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+ props.put("srcClusterName", srcCluster.getName());
+ props.put("srcClusterColo", srcCluster.getColo());
+ if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+ props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+ }
+ if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
+ props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
+ }
+
+ // the storage type is uniform across source and target feeds for replication
+ props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+ String instancePaths = "";
+ if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+ String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster);
+ instancePaths = pathsWithPartitions;
+
+ propagateFileSystemCopyProperties(pathsWithPartitions, props);
+ } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+ instancePaths = "${coord:dataIn('input')}";
+ final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+ propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+ final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+ propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+ propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props);
+ setupHiveConfiguration(srcCluster, trgCluster, buildPath);
+ }
+
+ propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
+ props.putAll(FeedHelper.getUserWorkflowProperties("replication"));
+
+ workflow.setConfiguration(getConfig(props));
+ action.setWorkflow(workflow);
+
+ return action;
+ }
+
+ private String getDefaultMaxMaps() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+ }
+
+ private String getDefaultMapBandwidth() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidthKB", "102400");
+ }
+
+ private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster) throws FalconException {
+ String srcPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(entity, srcCluster.getName()).getPartition());
+ srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+
+ String targetPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(entity, trgCluster.getName()).getPartition());
+ targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
+
+ StringBuilder pathsWithPartitions = new StringBuilder();
+ pathsWithPartitions.append("${coord:dataIn('input')}/")
+ .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+ String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+ parts = StringUtils.stripEnd(parts, "/");
+ return parts;
+ }
+
+ private void propagateFileSystemCopyProperties(String paths, Properties props) throws FalconException {
+ props.put("sourceRelativePaths", paths);
+
+ props.put("distcpSourcePaths", "${coord:dataIn('input')}");
+ props.put("distcpTargetPaths", "${coord:dataOut('output')}");
+ }
+
+ private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+ Properties props, String prefix) {
+ props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
+ props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
+ props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
+
+ props.put(prefix + "Database", tableStorage.getDatabase());
+ props.put(prefix + "Table", tableStorage.getTable());
+ props.put(prefix + "Partition", "${coord:dataInPartitions('input', 'hive-export')}");
+ }
+
+ private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
+ Cluster trgCluster, CatalogStorage targetStorage, Properties props) {
+ // create staging dirs for export at source & set it as distcpSourcePaths
+ String sourceStagingPath =
+ FeedHelper.getStagingPath(srcCluster, entity, sourceStorage, Tag.REPLICATION,
+ NOMINAL_TIME_EL + "/" + trgCluster.getName());
+ props.put("distcpSourcePaths", sourceStagingPath);
+
+ // create staging dirs for import at target & set it as distcpTargetPaths
+ String targetStagingPath =
+ FeedHelper.getStagingPath(trgCluster, entity, targetStorage, Tag.REPLICATION,
+ NOMINAL_TIME_EL + "/" + trgCluster.getName());
+ props.put("distcpTargetPaths", targetStagingPath);
+
+ props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage.
+ }
+
+ private void propagateLateDataProperties(String instancePaths, String falconFeedStorageType, Properties props) {
+ // todo these pairs are the same but used in different context
+ // late data handler - should-record action
+ props.put("falconInputFeeds", entity.getName());
+ props.put("falconInPaths", instancePaths);
+
+ // storage type for each corresponding feed - in this case only one feed is involved
+ // needed to compute usage based on storage type in LateDataHandler
+ props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+ // falcon post processing
+ props.put(ARG.feedNames.getPropName(), entity.getName());
+ props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+ }
+
+ private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
+ Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
+
+ try {
+ // copy import export scripts to stagingDir
+ Path scriptPath = new Path(buildPath, "scripts");
+ copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-export.hql");
+ copyHiveScript(fs, scriptPath, "/workflow/", "falcon-table-import.hql");
+
+ // create hive conf to stagingDir
+ Path confPath = new Path(buildPath + "/conf");
+ persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-");
+ persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-");
+ } catch (IOException e) {
+ throw new FalconException("Unable to create hive conf files", e);
+ }
+ }
+
+ private void copyHiveScript(FileSystem fs, Path scriptPath, String localScriptPath,
+ String scriptName) throws IOException {
+ OutputStream out = null;
+ InputStream in = null;
+ try {
+ out = fs.create(new Path(scriptPath, scriptName));
+ in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ protected void persistHiveConfiguration(FileSystem fs, Path confPath,
+ Cluster cluster, String prefix) throws IOException {
+ Configuration hiveConf = getHiveCredentialsAsConf(cluster);
+ OutputStream out = null;
+ try {
+ out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+ hiveConf.writeXml(out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ private void initializeCoordAttributes(COORDINATORAPP coord, String coordName, String start, String end,
+ long delayInMillis) {
+ coord.setName(coordName);
+ coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+
+ if (delayInMillis > 0) {
+ long delayInMins = -1 * delayInMillis / (1000 * 60);
+ String elExp = "${now(0," + delayInMins + ")}";
+
+ coord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+ coord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+ }
+
+ coord.setStart(start);
+ coord.setEnd(end);
+ coord.setTimezone(entity.getTimezone().getID());
+ }
+
+ private void setCoordControls(COORDINATORAPP coord) throws FalconException {
+ long frequencyInMillis = ExpressionHelper.get().evaluate(entity.getFrequency().toString(), Long.class);
+ long timeoutInMillis = frequencyInMillis * 6;
+ if (timeoutInMillis < THIRTY_MINUTES) {
+ timeoutInMillis = THIRTY_MINUTES;
+ }
+
+ Properties props = getEntityProperties(entity);
+ String timeout = props.getProperty(TIMEOUT);
+ if (timeout!=null) {
+ try{
+ timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
+ } catch (Exception ignore) {
+ LOG.error("Unable to evaluate timeout:", ignore);
+ }
+ }
+ coord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+ coord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+
+ String parallelProp = props.getProperty(PARALLEL);
+ int parallel = 1;
+ if (parallelProp != null) {
+ try {
+ parallel = Integer.parseInt(parallelProp);
+ } catch (NumberFormatException ignore) {
+ LOG.error("Unable to parse parallel:", ignore);
+ }
+ }
+ coord.getControls().setConcurrency(String.valueOf(parallel));
+ }
+
+
+ private void initializeInputDataSet(Cluster cluster, COORDINATORAPP coord, Storage storage) throws FalconException {
+ SYNCDATASET inputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+ String uriTemplate = storage.getUriTemplate(LocationType.DATA);
+ if (storage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ inputDataset.setUriTemplate(uriTemplate);
+
+ setDatasetValues(inputDataset, cluster);
+
+ if (entity.getAvailabilityFlag() == null) {
+ inputDataset.setDoneFlag("");
+ } else {
+ inputDataset.setDoneFlag(entity.getAvailabilityFlag());
+ }
+ }
+
+ private void initializeOutputDataSet(Cluster cluster, COORDINATORAPP coord,
+ Storage storage) throws FalconException {
+ SYNCDATASET outputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(1);
+
+ String uriTemplate = storage.getUriTemplate(LocationType.DATA);
+ if (storage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ outputDataset.setUriTemplate(uriTemplate);
+
+ setDatasetValues(outputDataset, cluster);
+ }
+
+ private void setDatasetValues(SYNCDATASET dataset, Cluster cluster) {
+ dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+ FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart()));
+ dataset.setTimezone(entity.getTimezone().getID());
+ dataset.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
+ }
+
+ private long getReplicationDelayInMillis(Cluster srcCluster) throws FalconException {
+ Frequency replicationDelay = FeedHelper.getCluster(entity, srcCluster.getName()).getDelay();
+ long delayInMillis=0;
+ if (replicationDelay != null) {
+ delayInMillis = ExpressionHelper.get().evaluate(
+ replicationDelay.toString(), Long.class);
+ }
+
+ return delayInMillis;
+ }
+
+ private Date getStartDate(Cluster cluster, long replicationDelayInMillis) {
+ Date startDate = FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart();
+ return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+ }
+
+ private Date getEndDate(Cluster cluster) {
+ return FeedHelper.getCluster(entity, cluster.getName()).getValidity().getEnd();
+ }
+
+ private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
+ Date targetStartDate, Date targetEndDate) {
+ return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
new file mode 100644
index 0000000..00fab99
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Builds feed replication workflow, one per source-target cluster combination.
+ */
+public class FeedReplicationWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+ private static final String REPLICATION_WF_TEMPLATE = "/workflow/replication-workflow.xml";
+ private static final String SOURCE_HIVE_CREDENTIAL_NAME = "falconSourceHiveAuth";
+ private static final String TARGET_HIVE_CREDENTIAL_NAME = "falconTargetHiveAuth";
+
+ public FeedReplicationWorkflowBuilder(Feed entity) {
+ super(entity, Tag.REPLICATION);
+ }
+
+ @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+ WORKFLOWAPP workflow = getWorkflow(REPLICATION_WF_TEMPLATE);
+ Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, buildPath.getName());
+ String wfName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
+ workflow.setName(wfName);
+
+ addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
+
+ addOozieRetries(workflow);
+
+ if (isTableStorageType(cluster)) {
+ setupHiveCredentials(cluster, srcCluster, workflow);
+ }
+
+ marshal(cluster, workflow, buildPath);
+
+ return getProperties(buildPath, wfName);
+ }
+
+ private void setupHiveCredentials(Cluster targetCluster, Cluster sourceCluster, WORKFLOWAPP workflowApp) {
+ if (isSecurityEnabled) {
+ // add hcatalog credentials for secure mode and add a reference to each action
+ addHCatalogCredentials(workflowApp, sourceCluster, SOURCE_HIVE_CREDENTIAL_NAME);
+ addHCatalogCredentials(workflowApp, targetCluster, TARGET_HIVE_CREDENTIAL_NAME);
+ }
+
+ // hive-site.xml file is created later in coordinator initialization but
+ // actions are set to point to that here
+
+ for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+ continue;
+ }
+
+ org.apache.falcon.oozie.workflow.ACTION action =
+ (org.apache.falcon.oozie.workflow.ACTION) object;
+ String actionName = action.getName();
+ if ("recordsize".equals(actionName)) {
+ // add reference to hive-site conf to each action
+ action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
+
+ if (isSecurityEnabled) { // add a reference to credential in the action
+ action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+ }
+ } else if ("table-export".equals(actionName)) {
+ if (isSecurityEnabled) { // add a reference to credential in the action
+ action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
+ }
+ } else if ("table-import".equals(actionName)) {
+ if (isSecurityEnabled) { // add a reference to credential in the action
+ action.setCred(TARGET_HIVE_CREDENTIAL_NAME);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
new file mode 100644
index 0000000..4393c94
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.oozie.OozieEntityBuilder;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Builds feed retention coordinator.
+ */
+public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
+ public FeedRetentionCoordinatorBuilder(Feed entity) {
+ super(entity, Tag.RETENTION);
+ }
+
+ @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+
+ if (feedCluster.getValidity().getEnd().before(new Date())) {
+ LOG.warn("Feed Retention is not applicable as Feed's end time for cluster {} is not in the future",
+ cluster.getName());
+ return null;
+ }
+
+ COORDINATORAPP coord = new COORDINATORAPP();
+ String coordName = getEntityName();
+ coord.setName(coordName);
+ coord.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+ coord.setStart(SchemaHelper.formatDateUTC(new Date()));
+ coord.setTimezone(entity.getTimezone().getID());
+ TimeUnit timeUnit = entity.getFrequency().getTimeUnit();
+ if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
+ coord.setFrequency("${coord:hours(6)}");
+ } else {
+ coord.setFrequency("${coord:days(1)}");
+ }
+
+ Path coordPath = getBuildPath(buildPath);
+ Properties props = createCoordDefaultConfiguration(cluster, coordName);
+ props.put("timeZone", entity.getTimezone().getID());
+ props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+ final Storage storage = FeedHelper.createStorage(cluster, entity);
+ props.put("falconFeedStorageType", storage.getType().name());
+
+ String feedDataPath = storage.getUriTemplate();
+ props.put("feedDataPath",
+ feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+ props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+ props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+ props.put(ARG.feedNames.getPropName(), entity.getName());
+ props.put(ARG.feedInstancePaths.getPropName(), IGNORE);
+
+ props.put("falconInputFeeds", entity.getName());
+ props.put("falconInPaths", IGNORE);
+
+ props.putAll(FeedHelper.getUserWorkflowProperties("eviction"));
+
+ WORKFLOW workflow = new WORKFLOW();
+ Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, Tag.RETENTION).build(cluster, coordPath);
+ workflow.setAppPath(getStoragePath(wfProp.getProperty(OozieEntityBuilder.ENTITY_PATH)));
+ workflow.setConfiguration(getConfig(props));
+ ACTION action = new ACTION();
+ action.setWorkflow(workflow);
+
+ coord.setAction(action);
+
+ marshal(cluster, coord, coordPath);
+
+ return Arrays.asList(getProperties(coordPath, coordName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
new file mode 100644
index 0000000..4a7f96b
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.feed;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.hadoop.fs.Path;
+
+import java.util.Properties;
+
+/**
+ * Builds feed retention workflow.
+ */
+public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuilder<Feed> {
+ private static final String RETENTION_WF_TEMPLATE = "/workflow/retention-workflow.xml";
+
+ public FeedRetentionWorkflowBuilder(Feed entity) {
+ super(entity, Tag.DEFAULT);
+ }
+
+ @Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
+ WORKFLOWAPP workflow = getWorkflow(RETENTION_WF_TEMPLATE);
+ String wfName = EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString();
+ workflow.setName(wfName);
+ addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION);
+ addOozieRetries(workflow);
+
+ if (isTableStorageType(cluster)) {
+ setupHiveCredentials(cluster, buildPath, workflow);
+ }
+
+ marshal(cluster, workflow, buildPath);
+ return getProperties(buildPath, wfName);
+ }
+
+ private void setupHiveCredentials(Cluster cluster, Path wfPath,
+ WORKFLOWAPP workflowApp) throws FalconException {
+ if (isSecurityEnabled) {
+ // add hcatalog credentials for secure mode and add a reference to each action
+ addHCatalogCredentials(workflowApp, cluster, HIVE_CREDENTIAL_NAME);
+ }
+
+ // create hive-site.xml file so actions can use it in the classpath
+ createHiveConfiguration(cluster, wfPath, ""); // no prefix since only one hive instance
+
+ for (Object object : workflowApp.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+ continue;
+ }
+
+ org.apache.falcon.oozie.workflow.ACTION action =
+ (org.apache.falcon.oozie.workflow.ACTION) object;
+ String actionName = action.getName();
+ if ("eviction".equals(actionName)) {
+ // add reference to hive-site conf to each action
+ action.getJava().setJobXml("${wf:appPath()}/conf/hive-site.xml");
+
+ if (isSecurityEnabled) {
+ // add a reference to credential in the action
+ action.setCred(HIVE_CREDENTIAL_NAME);
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
new file mode 100644
index 0000000..79a1883
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/HiveProcessWorkflowBuilder.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.hive.CONFIGURATION.Property;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.hadoop.fs.Path;
+
+import javax.xml.bind.JAXBElement;
+import java.util.List;
+
+/**
+ * Builds orchestration workflow for process where engine is hive.
+ */
+public class HiveProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+ public HiveProcessWorkflowBuilder(Process entity) {
+ super(entity);
+ }
+
+ @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
+ if (!action.getName().equals("user-hive-job")) {
+ return;
+ }
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(action);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
+ hiveAction.setScript(getStoragePath(userWfPath));
+
+ addPrepareDeleteOutputPath(hiveAction);
+
+ final List<String> paramList = hiveAction.getParam();
+ addInputFeedsAsParams(paramList, cluster);
+ addOutputFeedsAsParams(paramList, cluster);
+
+ propagateEntityProperties(hiveAction);
+
+ // adds hive-site.xml in hive classpath
+ hiveAction.setJobXml("${wf:appPath()}/conf/hive-site.xml");
+
+ addArchiveForCustomJars(cluster, hiveAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
+ buildPath));
+
+ OozieUtils.marshalHiveAction(action, actionJaxbElement);
+ }
+
+ private void propagateEntityProperties(org.apache.falcon.oozie.hive.ACTION hiveAction) {
+ CONFIGURATION conf = new CONFIGURATION();
+ super.propagateEntityProperties(conf, hiveAction.getParam());
+
+ List<Property> hiveConf = hiveAction.getConfiguration().getProperty();
+ for (CONFIGURATION.Property prop : conf.getProperty()) {
+ Property hiveProp = new Property();
+ hiveProp.setName(prop.getName());
+ hiveProp.setValue(prop.getValue());
+ hiveConf.add(hiveProp);
+ }
+ }
+
+ private void addPrepareDeleteOutputPath(org.apache.falcon.oozie.hive.ACTION hiveAction) throws FalconException {
+
+ List<String> deleteOutputPathList = getPrepareDeleteOutputPathList();
+ if (deleteOutputPathList.isEmpty()) {
+ return;
+ }
+
+ org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+ List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+ for (String deletePath : deleteOutputPathList) {
+ org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+ delete.setPath(deletePath);
+ deleteList.add(delete);
+ }
+
+ if (!deleteList.isEmpty()) {
+ hiveAction.setPrepare(prepare);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
new file mode 100644
index 0000000..977d8c1
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilder.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Builds oozie workflow for process where the engine is oozie.
+ */
+public class OozieProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+ public OozieProcessWorkflowBuilder(Process entity) {
+ super(entity);
+ }
+
+ @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
+ if (!action.getName().equals("user-oozie-workflow")) {
+ return;
+ }
+ action.getSubWorkflow().setAppPath(getStoragePath(ProcessHelper.getUserWorkflowPath(entity, cluster,
+ buildPath)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
new file mode 100644
index 0000000..29f601d
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/PigProcessWorkflowBuilder.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DELETE;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.PREPARE;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+/**
+ * Builds orchestration workflow for process where engine is pig.
+ */
+public class PigProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
+
+ public PigProcessWorkflowBuilder(Process entity) {
+ super(entity);
+ }
+
+ @Override protected void decorateAction(ACTION action, Cluster cluster, Path buildPath) throws FalconException {
+ if (!action.getName().equals("user-pig-job")) {
+ return;
+ }
+
+ PIG pigAction = action.getPig();
+ Path userWfPath = ProcessHelper.getUserWorkflowPath(entity, cluster, buildPath);
+ pigAction.setScript(getStoragePath(userWfPath));
+
+ addPrepareDeleteOutputPath(pigAction);
+
+ final List<String> paramList = pigAction.getParam();
+ addInputFeedsAsParams(paramList, cluster);
+ addOutputFeedsAsParams(paramList, cluster);
+
+ propagateEntityProperties(pigAction.getConfiguration(), pigAction.getParam());
+
+ if (isTableStorageType(cluster)) { // adds hive-site.xml in pig classpath
+ pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+ }
+
+ addArchiveForCustomJars(cluster, pigAction.getArchive(), ProcessHelper.getUserLibPath(entity, cluster,
+ buildPath));
+ }
+
+ private void addPrepareDeleteOutputPath(PIG pigAction) throws FalconException {
+ List<String> deleteOutputPathList = getPrepareDeleteOutputPathList();
+ if (deleteOutputPathList.isEmpty()) {
+ return;
+ }
+
+ final PREPARE prepare = new PREPARE();
+ final List<DELETE> deleteList = prepare.getDelete();
+
+ for (String deletePath : deleteOutputPathList) {
+ final DELETE delete = new DELETE();
+ delete.setPath(deletePath);
+ deleteList.add(delete);
+ }
+
+ if (!deleteList.isEmpty()) {
+ pigAction.setPrepare(prepare);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/185b5888/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
new file mode 100644
index 0000000..86cea93
--- /dev/null
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessBundleBuilder.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.oozie.process;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.oozie.OozieBundleBuilder;
+import org.apache.falcon.oozie.OozieCoordinatorBuilder;
+import org.apache.falcon.update.UpdateHelper;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.CoordinatorJob.Timeunit;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Builds oozie bundle for process - schedulable entity in oozie.
+ */
+public class ProcessBundleBuilder extends OozieBundleBuilder<Process> {
+
+ public ProcessBundleBuilder(Process entity) {
+ super(entity);
+ }
+
+ @Override protected Properties getAdditionalProperties(Cluster cluster) throws FalconException {
+ Properties properties = new Properties();
+ if (entity.getInputs() != null) {
+ for (Input in : entity.getInputs().getInputs()) {
+ if (in.isOptional()) {
+ properties.putAll(getOptionalInputProperties(in, cluster.getName()));
+ }
+ }
+ }
+ return properties;
+ }
+
+ private Properties getOptionalInputProperties(Input in, String clusterName) throws FalconException {
+ Properties properties = new Properties();
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, in.getFeed());
+ org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, clusterName);
+ String inName = in.getName();
+ properties.put(inName + ".frequency", String.valueOf(feed.getFrequency().getFrequency()));
+ properties.put(inName + ".freq_timeunit", mapToCoordTimeUnit(feed.getFrequency().getTimeUnit()).name());
+ properties.put(inName + ".timezone", feed.getTimezone().getID());
+ properties.put(inName + ".end_of_duration", Timeunit.NONE.name());
+ properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
+ properties.put(inName + ".done-flag", "notused");
+
+ String locPath = FeedHelper.createStorage(clusterName, feed)
+ .getUriTemplate(LocationType.DATA).replace('$', '%');
+ properties.put(inName + ".uri-template", locPath);
+
+ properties.put(inName + ".start-instance", in.getStart());
+ properties.put(inName + ".end-instance", in.getEnd());
+ return properties;
+ }
+
+ private Timeunit mapToCoordTimeUnit(TimeUnit tu) {
+ switch (tu) {
+ case days:
+ return Timeunit.DAY;
+
+ case hours:
+ return Timeunit.HOUR;
+
+ case minutes:
+ return Timeunit.MINUTE;
+
+ case months:
+ return Timeunit.MONTH;
+
+ default:
+ throw new IllegalArgumentException("Unhandled time unit " + tu);
+ }
+ }
+
+ @Override protected Path getLibPath(Cluster cluster, Path buildPath) throws FalconException {
+ return ProcessHelper.getUserLibPath(entity, cluster, buildPath);
+ }
+
+ @Override protected List<Properties> doBuild(Cluster cluster, Path buildPath) throws FalconException {
+ copyUserWorkflow(cluster, buildPath);
+
+ return OozieCoordinatorBuilder.get(entity, Tag.DEFAULT).buildCoords(cluster, buildPath);
+ }
+
+ private void copyUserWorkflow(Cluster cluster, Path buildPath) throws FalconException {
+ try {
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+
+ //Copy user workflow and lib to staging dir
+ Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),
+ new Path(buildPath, EntityUtil.PROCESS_USER_DIR));
+ if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) {
+ checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()),
+ new Path(buildPath, EntityUtil.PROCESS_USERLIB_DIR)));
+ }
+
+ writeChecksums(fs, new Path(buildPath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
+ } catch (IOException e) {
+ throw new FalconException("Failed to copy user workflow/lib", e);
+ }
+ }
+
+ private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
+ try {
+ FSDataOutputStream stream = fs.create(path);
+ try {
+ for (Map.Entry<String, String> entry : checksums.entrySet()) {
+ stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+ }
+ } finally {
+ stream.close();
+ }
+ } catch (IOException e) {
+ throw new FalconException("Failed to copy user workflow/lib", e);
+ }
+ }
+}