You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/03/18 12:41:06 UTC
[1/5] FALCON-356 Merge OozieProcessMapper and
OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Repository: incubator-falcon
Updated Branches:
refs/heads/master 5e4352151 -> e2545b087
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
new file mode 100644
index 0000000..44f5d80
--- /dev/null
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.converter;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Validity;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.messaging.EntityInstanceMessage;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.OozieProcessWorkflowBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * Test for the Falcon entities mapping into Oozie artifacts.
+ */
+public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
+
+ private String hdfsUrl;
+ private FileSystem fs;
+
+ @BeforeClass
+ public void setUpDFS() throws Exception {
+ CurrentUser.authenticate("falcon");
+
+ EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
+ Configuration conf = cluster.getConf();
+ hdfsUrl = conf.get("fs.default.name");
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ super.setup();
+
+ ConfigurationStore store = ConfigurationStore.get();
+ Cluster cluster = store.get(EntityType.CLUSTER, "corp");
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+ ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
+ fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
+ fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
+
+ Process process = store.get(EntityType.PROCESS, "clicksummary");
+ Path wfpath = new Path(process.getWorkflow().getPath());
+ assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
+ }
+
+ public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
+ assertEquals("FALCON_PROCESS_DEFAULT_" + process.getName(), coord.getName());
+ Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+ assertEquals(SchemaHelper.formatDateUTC(processValidity.getStart()), coord.getStart());
+ assertEquals(SchemaHelper.formatDateUTC(processValidity.getEnd()), coord.getEnd());
+ assertEquals("${coord:" + process.getFrequency().toString() + "}", coord.getFrequency());
+ assertEquals(process.getTimezone().getID(), coord.getTimezone());
+
+ assertEquals(process.getParallel() + "", coord.getControls().getConcurrency());
+ assertEquals(process.getOrder().name(), coord.getControls().getExecution());
+
+ assertEquals(process.getInputs().getInputs().get(0).getName(),
+ coord.getInputEvents().getDataIn().get(0).getName());
+ assertEquals(process.getInputs().getInputs().get(0).getName(),
+ coord.getInputEvents().getDataIn().get(0).getDataset());
+ assertEquals("${" + process.getInputs().getInputs().get(0).getStart() + "}",
+ coord.getInputEvents().getDataIn().get(0).getStartInstance());
+ assertEquals("${" + process.getInputs().getInputs().get(0).getEnd() + "}",
+ coord.getInputEvents().getDataIn().get(0).getEndInstance());
+
+ assertEquals(process.getInputs().getInputs().get(1).getName(),
+ coord.getInputEvents().getDataIn().get(1).getName());
+ assertEquals(process.getInputs().getInputs().get(1).getName(),
+ coord.getInputEvents().getDataIn().get(1).getDataset());
+ assertEquals("${" + process.getInputs().getInputs().get(1).getStart() + "}",
+ coord.getInputEvents().getDataIn().get(1).getStartInstance());
+ assertEquals("${" + process.getInputs().getInputs().get(1).getEnd() + "}",
+ coord.getInputEvents().getDataIn().get(1).getEndInstance());
+
+ assertEquals(process.getOutputs().getOutputs().get(0).getName() + "stats",
+ coord.getOutputEvents().getDataOut().get(1).getName());
+ assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
+ coord.getOutputEvents().getDataOut().get(2).getName());
+ assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
+ coord.getOutputEvents().getDataOut().get(3).getName());
+
+ assertEquals(process.getOutputs().getOutputs().get(0).getName(),
+ coord.getOutputEvents().getDataOut().get(0).getName());
+ assertEquals("${" + process.getOutputs().getOutputs().get(0).getInstance() + "}",
+ coord.getOutputEvents().getDataOut().get(0).getInstance());
+ assertEquals(process.getOutputs().getOutputs().get(0).getName(),
+ coord.getOutputEvents().getDataOut().get(0).getDataset());
+
+ assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
+
+ ConfigurationStore store = ConfigurationStore.get();
+ Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
+ SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+ final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+ assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
+ assertEquals(feed.getTimezone().getID(), ds.getTimezone());
+ assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
+ assertEquals("", ds.getDoneFlag());
+ assertEquals(ds.getUriTemplate(),
+ FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
+
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ assertEquals(props.get("mapred.job.priority"), "LOW");
+
+ assertLibExtensions(coord);
+ }
+
+ @Test
+ public void testBundle() throws Exception {
+ String path = StartupProperties.get().getProperty("system.lib.location");
+ if (!new File(path).exists()) {
+ Assert.assertTrue(new File(path).mkdirs());
+ }
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+
+ WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
+ testParentWorkflow(process, parentWorkflow);
+ }
+
+ @Test
+ public void testBundle1() throws Exception {
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+ process.setFrequency(Frequency.fromString("minutes(1)"));
+ process.setTimeout(Frequency.fromString("minutes(15)"));
+
+ WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
+ testParentWorkflow(process, parentWorkflow);
+ }
+
+ @Test
+ public void testPigProcessMapper() throws Exception {
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
+ Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
+
+ prepare(process);
+ WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
+ testParentWorkflow(process, parentWorkflow);
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+ ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
+ Assert.assertEquals("user-pig-job", pigActionNode.getName());
+
+ final PIG pigAction = pigActionNode.getPig();
+ Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
+ Assert.assertNotNull(pigAction.getPrepare());
+ Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
+ Assert.assertFalse(pigAction.getParam().isEmpty());
+ Assert.assertEquals(5, pigAction.getParam().size());
+ Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
+ Assert.assertTrue(pigAction.getFile().size() > 0);
+
+ ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
+ Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
+ Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
+ }
+
+ @Test
+ public void testHiveProcessMapper() throws Exception {
+ URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+ Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+ resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+ Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+ resource = this.getClass().getResource("/config/process/hive-process.xml");
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+ prepare(process);
+ OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+ Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+ builder.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ // verify table props
+ Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (expected.containsKey(entry.getKey())) {
+ Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+ }
+ }
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+ testParentWorkflow(process, parentWorkflow);
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+ ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+ Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ Assert.assertEquals(hiveAction.getScript(),
+ "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
+ Assert.assertNull(hiveAction.getPrepare());
+ Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+ Assert.assertFalse(hiveAction.getParam().isEmpty());
+ Assert.assertEquals(11, hiveAction.getParam().size());
+ }
+
+ private void prepare(Process process) throws IOException {
+ Path wf = new Path(process.getWorkflow().getPath());
+ fs.mkdirs(wf.getParent());
+ fs.create(wf).close();
+ }
+
+ @Test
+ public void testProcessMapperForTableStorage() throws Exception {
+ URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+ Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+ resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+ Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+ resource = this.getClass().getResource("/config/process/pig-process-table.xml");
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+ OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+ Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+ builder.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ // verify table props
+ Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (expected.containsKey(entry.getKey())) {
+ Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+ }
+ }
+
+ // verify the late data params
+ Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
+ Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+ // verify the post processing params
+ Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
+ Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+ }
+
+ private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
+ Cluster cluster) throws FalconException {
+ Map<String, String> expected = new HashMap<String, String>();
+ for (Input input : process.getInputs().getInputs()) {
+ CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
+ propagateStorageProperties(input.getName(), storage, expected);
+ }
+
+ for (Output output : process.getOutputs().getOutputs()) {
+ CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
+ propagateStorageProperties(output.getName(), storage, expected);
+ }
+
+ return expected;
+ }
+
+ private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
+ Map<String, String> props) {
+ String prefix = "falcon_" + feedName;
+ props.put(prefix + "_storage_type", tableStorage.getType().name());
+ props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+ props.put(prefix + "_database", tableStorage.getDatabase());
+ props.put(prefix + "_table", tableStorage.getTable());
+
+ if (prefix.equals("falcon_input")) {
+ props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
+ props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
+ props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+ } else if (prefix.equals("falcon_output")) {
+ props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
+ }
+ }
+
+ @Test
+ public void testProcessWorkflowMapper() throws Exception {
+ Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
+ Workflow processWorkflow = process.getWorkflow();
+ Assert.assertEquals("test", processWorkflow.getName());
+ Assert.assertEquals("1.0.0", processWorkflow.getVersion());
+ }
+
+ @SuppressWarnings("unchecked")
+ private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+ WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+ fs.open(new Path(wfPath, "workflow.xml")))).getValue();
+ List<Object> actions = wf.getDecisionOrForkOrJoin();
+ for (Object obj : actions) {
+ if (!(obj instanceof ACTION)) {
+ continue;
+ }
+ ACTION action = (ACTION) obj;
+ List<String> files = null;
+ if (action.getJava() != null) {
+ files = action.getJava().getFile();
+ } else if (action.getPig() != null) {
+ files = action.getPig().getFile();
+ } else if (action.getMapReduce() != null) {
+ files = action.getMapReduce().getFile();
+ }
+ if (files != null) {
+ Assert.assertTrue(files.get(files.size() - 1)
+ .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
+ }
+ }
+ }
+
+ private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
+ throws Exception {
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+ OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(process);
+ Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
+ builder.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ testDefCoordMap(process, coord);
+ assertEquals(coord.getControls().getThrottle(), throttle);
+ assertEquals(coord.getControls().getTimeout(), timeout);
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ return getParentWorkflow(new Path(wfPath));
+ }
+
+ public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
+ Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+ Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+ Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+ Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+ Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+ Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+ Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
+ Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+ Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+ Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
+ Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
+ Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
+ Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
+ Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
+ Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
+ }
+
+ private COORDINATORAPP getCoordinator(Path path) throws Exception {
+ String bundleStr = readFile(path);
+
+ Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
+ SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
+ Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
+ unmarshaller.setSchema(schema);
+ JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
+ new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), COORDINATORAPP.class);
+ return jaxbBundle.getValue();
+ }
+
+ @SuppressWarnings("unchecked")
+ private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
+ String workflow = readFile(new Path(path, "workflow.xml"));
+
+ JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+ Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
+ return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+ new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
+ }
+
+ private BUNDLEAPP getBundle(Path path) throws Exception {
+ String bundleStr = readFile(new Path(path, "bundle.xml"));
+
+ Unmarshaller unmarshaller = JAXBContext.newInstance(BUNDLEAPP.class).createUnmarshaller();
+ SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
+ Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
+ unmarshaller.setSchema(schema);
+ JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
+ new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
+ return jaxbBundle.getValue();
+ }
+
+ private String readFile(Path path) throws Exception {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
+ String line;
+ StringBuilder contents = new StringBuilder();
+ while ((line = reader.readLine()) != null) {
+ contents.append(line);
+ }
+ return contents.toString();
+ }
+
+ @Override
+ @AfterMethod
+ public void cleanup() throws Exception {
+ super.cleanup();
+ ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
+ ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
+ ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
+ ConfigurationStore.get().remove(EntityType.PROCESS, "dumb-process");
+ }
+
+ @Test
+ public void testProcessWithNoInputsAndOutputs() throws Exception {
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+
+ URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
+ Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
+
+ OozieProcessWorkflowBuilder builder = new OozieProcessWorkflowBuilder(processEntity);
+ Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
+ builder.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ String[] expected = {
+ EntityInstanceMessage.ARG.feedNames.getPropName(),
+ EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
+ "falconInputFeeds",
+ "falconInPaths",
+ "userWorkflowName",
+ "userWorkflowVersion",
+ "userWorkflowEngine",
+ };
+
+ for (String property : expected) {
+ Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
----------------------------------------------------------------------
diff --git a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
index 1e7cc04..eb4173e 100644
--- a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
+++ b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
@@ -363,7 +363,7 @@ public class FeedEvictorTest {
}
}
- @Test
+ @Test(enabled = false)
public void testEvictionWithEmptyDirs() throws Exception {
try {
Configuration conf = cluster.getConf();
[3/5] FALCON-356 Merge OozieProcessMapper and
OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index e5a01ca..990fdc5 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -18,22 +18,64 @@
package org.apache.falcon.workflow;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.FalconRuntimException;
+import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.ExternalId;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Property;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.bundle.COORDINATOR;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.ObjectFactory;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.service.FalconPathFilter;
+import org.apache.falcon.service.SharedLibraryHostingService;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
import org.apache.oozie.client.OozieClient;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.Arrays;
import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Set;
/**
* Base workflow builder for falcon entities.
@@ -44,6 +86,334 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
private static final Logger LOG = Logger.getLogger(OozieWorkflowBuilder.class);
protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
+ protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
+ protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
+
+ protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
+ protected static final String MR_QUEUE_NAME = "queueName";
+ protected static final String MR_JOB_PRIORITY = "jobPriority";
+
+ public static final Set<String> FALCON_ACTIONS = new HashSet<String>(
+ Arrays.asList(new String[]{"recordsize", "succeeded-post-processing", "failed-post-processing", }));
+
+ protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("falcon");
+ }
+
+ @Override
+ public String getJarName(Path path) {
+ String name = path.getName();
+ if (name.endsWith(".jar")) {
+ name = name.substring(0, name.indexOf(".jar"));
+ }
+ return name;
+ }
+ };
+
+ protected OozieWorkflowBuilder(T entity) {
+ super(entity);
+ }
+
+ protected Path getCoordPath(Path bundlePath, String coordName) {
+ Tag tag = EntityUtil.getWorkflowNameTag(coordName, entity);
+ return new Path(bundlePath, tag.name());
+ }
+
+ protected abstract Map<String, String> getEntityProperties();
+
+ public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
+ BUNDLEAPP bundleApp = new BUNDLEAPP();
+ bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
+ // all the properties are set prior to bundle and coordinators creation
+
+ List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
+ if (coordinators.size() == 0) {
+ return false;
+ }
+ for (COORDINATORAPP coordinatorapp : coordinators) {
+ Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
+ String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
+ EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
+ createLogsDir(cluster, coordPath);
+ COORDINATOR bundleCoord = new COORDINATOR();
+ bundleCoord.setName(coordinatorapp.getName());
+ bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
+ bundleApp.getCoordinator().add(bundleCoord);
+
+ copySharedLibs(cluster, coordPath);
+ }
+
+ marshal(cluster, bundleApp, bundlePath);
+ return true;
+ }
+
+ private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
+ FileStatus[] libs = null;
+ try {
+ libs = fs.listStatus(path);
+ } catch(FileNotFoundException ignore) {
+ //Ok if the libext is not configured
+ }
+
+ if (libs == null) {
+ return;
+ }
+
+ for(FileStatus lib : libs) {
+ if (lib.isDir()) {
+ continue;
+ }
+
+ for(Object obj: wf.getDecisionOrForkOrJoin()) {
+ if (!(obj instanceof ACTION)) {
+ continue;
+ }
+ ACTION action = (ACTION) obj;
+ List<String> files = null;
+ if (action.getJava() != null) {
+ files = action.getJava().getFile();
+ } else if (action.getPig() != null) {
+ files = action.getPig().getFile();
+ } else if (action.getMapReduce() != null) {
+ files = action.getMapReduce().getFile();
+ }
+ if (files != null) {
+ files.add(lib.getPath().toString());
+ }
+ }
+ }
+ }
+
+ protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
+ throws IOException, FalconException {
+ String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+ addExtensionJars(fs, new Path(libext), wf);
+ addExtensionJars(fs, new Path(libext, type.name()), wf);
+ if (StringUtils.isNotEmpty(lifecycle)) {
+ addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
+ }
+ }
+
+ private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
+ try {
+ Path libPath = new Path(coordPath, "lib");
+ SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
+ libPath, cluster, FALCON_JAR_FILTER);
+ } catch (IOException e) {
+ throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
+ }
+ }
+
+ public abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
+
+ protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
+ org.apache.falcon.oozie.coordinator.CONFIGURATION conf
+ = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
+ List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
+ for (Entry<String, String> prop : propMap.entrySet()) {
+ props.add(createCoordProperty(prop.getKey(), prop.getValue()));
+ }
+ return conf;
+ }
+
+ protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
+ Map<String, String> props = new HashMap<String, String>();
+ props.put(ARG.entityName.getPropName(), entity.getName());
+ props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
+ props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+ props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
+ props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
+ String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
+ "tcp://localhost:61616?daemon=true");
+ props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
+ String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
+ ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+ props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
+ String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+ DEFAULT_BROKER_MSG_TTL.toString());
+ props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
+ props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
+ props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
+ props.put(OozieClient.EXTERNAL_ID,
+ new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+ "${coord:nominalTime()}").getId());
+ props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+ try {
+ if (EntityUtil.getLateProcess(entity) == null
+ || EntityUtil.getLateProcess(entity).getLateInputs() == null
+ || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
+ props.put("shouldRecord", "false");
+ } else {
+ props.put("shouldRecord", "true");
+ }
+ } catch (FalconException e) {
+ LOG.error("Unable to get Late Process for entity:" + entity, e);
+ throw new FalconRuntimException(e);
+ }
+ props.put("entityName", entity.getName());
+ props.put("entityType", entity.getEntityType().name().toLowerCase());
+ props.put(ARG.cluster.getPropName(), cluster.getName());
+ if (cluster.getProperties() != null) {
+ for (Property prop : cluster.getProperties().getProperties()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ }
+
+ props.put(MR_QUEUE_NAME, "default");
+ props.put(MR_JOB_PRIORITY, "NORMAL");
+ //props in entity override the set props.
+ props.putAll(getEntityProperties());
+ return props;
+ }
+
+ protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
+ String value) {
+ org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
+ = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
+ prop.setName(name);
+ prop.setValue(value);
+ return prop;
+ }
+
+ protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
+ throws FalconException {
+ try {
+ Marshaller marshaller = jaxbContext.createMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(
+ outPath.toUri(), ClusterHelper.getConfiguration(cluster));
+ OutputStream out = fs.create(outPath);
+ try {
+ marshaller.marshal(jaxbElement, out);
+ } finally {
+ out.close();
+ }
+ if (LOG.isDebugEnabled()) {
+ StringWriter writer = new StringWriter();
+ marshaller.marshal(jaxbElement, writer);
+ LOG.debug("Writing definition to " + outPath + " on cluster " + cluster.getName());
+ LOG.debug(writer.getBuffer());
+ }
+
+ LOG.info("Marshalled " + jaxbElement.getDeclaredType() + " to " + outPath);
+ } catch (Exception e) {
+ throw new FalconException("Unable to marshall app object", e);
+ }
+ }
+
+ private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
+ try {
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(
+ coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
+ Path logsDir = new Path(coordPath, "../../logs");
+ fs.mkdirs(logsDir);
+
+ // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
+ FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+ fs.setPermission(logsDir, permission);
+ } catch (Exception e) {
+ throw new FalconException("Unable to create temp dir in " + coordPath, e);
+ }
+ }
+
+ protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, String name) throws FalconException {
+ if (StringUtils.isEmpty(name)) {
+ name = "coordinator";
+ }
+ name = name + ".xml";
+ marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), OozieUtils.COORD_JAXB_CONTEXT,
+ new Path(outPath, name));
+ return name;
+ }
+
+ protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
+ marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
+ OozieUtils.BUNDLE_JAXB_CONTEXT, new Path(outPath, "bundle.xml"));
+ }
+
+ protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
+ marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
+ OozieUtils.WORKFLOW_JAXB_CONTEXT, new Path(outPath, "workflow.xml"));
+ }
+
+ protected String getStoragePath(Path path) {
+ if (path != null) {
+ return getStoragePath(path.toString());
+ }
+ return null;
+ }
+
+ protected String getStoragePath(String path) {
+ if (StringUtils.isNotEmpty(path)) {
+ if (new Path(path).toUri().getScheme() == null) {
+ path = "${nameNode}" + path;
+ }
+ }
+ return path;
+ }
+
+ protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
+ InputStream resourceAsStream = null;
+ try {
+ resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+ Unmarshaller unmarshaller = OozieUtils.WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
+ @SuppressWarnings("unchecked")
+ JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+ resourceAsStream);
+ return jaxbElement.getValue();
+ } catch (JAXBException e) {
+ throw new FalconException(e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
+ }
+ }
+
+ protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
+ InputStream resourceAsStream = null;
+ try {
+ resourceAsStream = OozieWorkflowBuilder.class.getResourceAsStream(template);
+ Unmarshaller unmarshaller = OozieUtils.COORD_JAXB_CONTEXT.createUnmarshaller();
+ @SuppressWarnings("unchecked")
+ JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
+ unmarshaller.unmarshal(resourceAsStream);
+ return jaxbElement.getValue();
+ } catch (JAXBException e) {
+ throw new FalconException(e);
+ } finally {
+ IOUtils.closeQuietly(resourceAsStream);
+ }
+ }
+
+ protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
+ Cluster cluster, String prefix) throws IOException {
+ Configuration hiveConf = new Configuration(false);
+ hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
+ hiveConf.set("hive.metastore.local", "false");
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+ ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
+ hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+ }
+
+ OutputStream out = null;
+ try {
+ out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
+ hiveConf.writeXml(out);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ protected void decorateWithOozieRetries(ACTION action) {
+ Properties props = RuntimeProperties.get();
+ action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
+ action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
+ }
+
protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
Properties properties = new Properties();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index ac8862e..d819e93 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -116,7 +116,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
if (!schedClusters.isEmpty()) {
WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
- Map<String, Properties> newFlows = builder.newWorkflowSchedule(entity, schedClusters);
+ Map<String, Properties> newFlows = builder.newWorkflowSchedule(schedClusters.toArray(new
+ String[schedClusters.size()]));
for (Map.Entry<String, Properties> entry : newFlows.entrySet()) {
String cluster = entry.getKey();
LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster);
@@ -380,7 +381,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
List<Instance> runInstances = new ArrayList<Instance>();
- String[] wfNames = builder.getWorkflowNames(entity);
+ String[] wfNames = builder.getWorkflowNames();
List<String> coordNames = new ArrayList<String>();
for (String wfName : wfNames) {
if (EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString().equals(wfName)) {
@@ -1059,11 +1060,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private String scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
throws FalconException {
- WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, entity);
- Properties bundleProps = builder.newWorkflowSchedule(entity, startDate, cluster, user);
+ Entity clone = entity.copy();
+ EntityUtil.setStartDate(entity, cluster, startDate);
+ WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE, clone);
+ Map<String, Properties> bundleProps = builder.newWorkflowSchedule(cluster);
LOG.info("Scheduling " + entity.toShortString() + " on cluster " + cluster + " with props " + bundleProps);
- if (bundleProps != null) {
- return scheduleEntity(cluster, bundleProps, entity);
+ if (bundleProps != null && bundleProps.size() > 0) {
+ return scheduleEntity(cluster, bundleProps.get(cluster), entity);
} else {
LOG.info("No new workflow to be scheduled for this " + entity.toShortString());
return null;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
deleted file mode 100644
index e638961..0000000
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ /dev/null
@@ -1,833 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.ProcessHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Property;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.coordinator.CONTROLS;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.DATAIN;
-import org.apache.falcon.oozie.coordinator.DATAOUT;
-import org.apache.falcon.oozie.coordinator.DATASETS;
-import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DELETE;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.PREPARE;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.update.UpdateHelper;
-import org.apache.hadoop.fs.*;
-import org.apache.xerces.dom.ElementNSImpl;
-import org.w3c.dom.Document;
-
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.dom.DOMResult;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class maps the Falcon entities into Oozie artifacts.
- */
-public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
- private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
- private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
- public OozieProcessMapper(Process entity) {
- super(entity);
- }
-
- @Override
- protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
- try {
- FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
- Process process = getEntity();
-
- //Copy user workflow and lib to staging dir
- Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getPath()),
- new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
- if (process.getWorkflow().getLib() != null && fs.exists(new Path(process.getWorkflow().getLib()))) {
- checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(process.getWorkflow().getLib()),
- new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
- }
-
- writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
- } catch (IOException e) {
- throw new FalconException("Failed to copy user workflow/lib", e);
- }
-
- List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
- apps.add(createDefaultCoordinator(cluster, bundlePath));
-
- return apps;
- }
-
- private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
- try {
- FSDataOutputStream stream = fs.create(path);
- try {
- for (Map.Entry<String, String> entry : checksums.entrySet()) {
- stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
- }
- } finally {
- stream.close();
- }
- } catch (IOException e) {
- throw new FalconException("Failed to copy user workflow/lib", e);
- }
- }
-
- private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
- try {
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
- Process process = getEntity();
- Path wfPath = new Path(process.getWorkflow().getPath());
- if (fs.isFile(wfPath)) {
- return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
- } else {
- return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
- }
- } catch(IOException e) {
- throw new FalconException("Failed to get workflow path", e);
- }
- }
-
- private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
- try {
- Process process = getEntity();
- if (process.getWorkflow().getLib() == null) {
- return null;
- }
- Path libPath = new Path(process.getWorkflow().getLib());
-
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
- if (fs.isFile(libPath)) {
- return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
- } else {
- return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
- }
- } catch(IOException e) {
- throw new FalconException("Failed to get user lib path", e);
- }
- }
-
- /**
- * Creates default oozie coordinator.
- *
- * @param cluster - Cluster for which the coordiantor app need to be created
- * @param bundlePath - bundle path
- * @return COORDINATORAPP
- * @throws FalconException on Error
- */
- public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
- Process process = getEntity();
- if (process == null) {
- return null;
- }
-
- COORDINATORAPP coord = new COORDINATORAPP();
- String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString();
- Path coordPath = getCoordPath(bundlePath, coordName);
-
- // coord attributes
- initializeCoordAttributes(cluster, process, coord, coordName);
-
- CONTROLS controls = initializeControls(process); // controls
- coord.setControls(controls);
-
- // Configuration
- Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
-
- initializeInputPaths(cluster, process, coord, props); // inputs
- initializeOutputPaths(cluster, process, coord, props); // outputs
-
- Workflow processWorkflow = process.getWorkflow();
- propagateUserWorkflowProperties(processWorkflow, props, process.getName());
-
- // create parent wf
- createWorkflow(cluster, process, processWorkflow, coordName, coordPath);
-
- WORKFLOW wf = new WORKFLOW();
- wf.setAppPath(getStoragePath(coordPath.toString()));
- wf.setConfiguration(getCoordConfig(props));
-
- // set coord action to parent wf
- org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
- action.setWorkflow(wf);
- coord.setAction(action);
-
- return coord;
- }
-
- private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
- coord.setName(coordName);
- org.apache.falcon.entity.v0.process.Cluster processCluster =
- ProcessHelper.getCluster(process, cluster.getName());
- coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
- coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
- coord.setTimezone(process.getTimezone().getID());
- coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
- }
-
- private CONTROLS initializeControls(Process process)
- throws FalconException {
- CONTROLS controls = new CONTROLS();
- controls.setConcurrency(String.valueOf(process.getParallel()));
- controls.setExecution(process.getOrder().name());
-
- Frequency timeout = process.getTimeout();
- long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
- long timeoutInMillis;
- if (timeout != null) {
- timeoutInMillis = ExpressionHelper.get().
- evaluate(process.getTimeout().toString(), Long.class);
- } else {
- timeoutInMillis = frequencyInMillis * 6;
- if (timeoutInMillis < THIRTY_MINUTES) {
- timeoutInMillis = THIRTY_MINUTES;
- }
- }
- controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
-
- if (timeoutInMillis / frequencyInMillis * 2 > 0) {
- controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
- }
-
- return controls;
- }
-
- private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
- Map<String, String> props) throws FalconException {
- if (process.getInputs() == null) {
- props.put("falconInputFeeds", "NONE");
- props.put("falconInPaths", "IGNORE");
- return;
- }
-
- List<String> inputFeeds = new ArrayList<String>();
- List<String> inputPaths = new ArrayList<String>();
- List<String> inputFeedStorageTypes = new ArrayList<String>();
- for (Input input : process.getInputs().getInputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- if (!input.isOptional()) {
- if (coord.getDatasets() == null) {
- coord.setDatasets(new DATASETS());
- }
- if (coord.getInputEvents() == null) {
- coord.setInputEvents(new INPUTEVENTS());
- }
-
- SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
- coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
- DATAIN datain = createDataIn(input);
- coord.getInputEvents().getDataIn().add(datain);
- }
-
- String inputExpr = null;
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
- props.put(input.getName(), inputExpr);
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- inputExpr = "${coord:dataIn('" + input.getName() + "')}";
- propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
- }
-
- inputFeeds.add(feed.getName());
- inputPaths.add(inputExpr);
- inputFeedStorageTypes.add(storage.getType().name());
- }
-
- propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
- }
-
- private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
- List<String> inputFeedStorageTypes, Map<String, String> props) {
- // populate late data handler - should-record action
- props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
- props.put("falconInPaths", join(inputPaths.iterator(), '#'));
-
- // storage type for each corresponding feed sent as a param to LateDataHandler
- // needed to compute usage based on storage type in LateDataHandler
- props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
- }
-
- private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
- Map<String, String> props) throws FalconException {
- if (process.getOutputs() == null) {
- props.put(ARG.feedNames.getPropName(), "NONE");
- props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
- return;
- }
-
- if (coord.getDatasets() == null) {
- coord.setDatasets(new DATASETS());
- }
-
- if (coord.getOutputEvents() == null) {
- coord.setOutputEvents(new OUTPUTEVENTS());
- }
-
- List<String> outputFeeds = new ArrayList<String>();
- List<String> outputPaths = new ArrayList<String>();
- for (Output output : process.getOutputs().getOutputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
- coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
-
- DATAOUT dataout = createDataOut(output);
- coord.getOutputEvents().getDataOut().add(dataout);
-
- String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
- outputFeeds.add(feed.getName());
- outputPaths.add(outputExpr);
-
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- props.put(output.getName(), outputExpr);
-
- propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
- }
- }
-
- // Output feed name and path for parent workflow
- props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
- props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
- }
-
- private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
- String datasetName, LocationType locationType) throws FalconException {
-
- SYNCDATASET syncdataset = new SYNCDATASET();
- syncdataset.setName(datasetName);
- syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
- String uriTemplate = storage.getUriTemplate(locationType);
- if (storage.getType() == Storage.TYPE.TABLE) {
- uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
- }
- syncdataset.setUriTemplate(uriTemplate);
-
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
- syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
- syncdataset.setTimezone(feed.getTimezone().getID());
-
- if (feed.getAvailabilityFlag() == null) {
- syncdataset.setDoneFlag("");
- } else {
- syncdataset.setDoneFlag(feed.getAvailabilityFlag());
- }
-
- return syncdataset;
- }
-
- private DATAOUT createDataOut(Output output) {
- DATAOUT dataout = new DATAOUT();
- dataout.setName(output.getName());
- dataout.setDataset(output.getName());
- dataout.setInstance(getELExpression(output.getInstance()));
- return dataout;
- }
-
- private DATAIN createDataIn(Input input) {
- DATAIN datain = new DATAIN();
- datain.setName(input.getName());
- datain.setDataset(input.getName());
- datain.setStartInstance(getELExpression(input.getStart()));
- datain.setEndInstance(getELExpression(input.getEnd()));
- return datain;
- }
-
- private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
- Storage storage, Map<String, String> props)
- throws FalconException {
-
- // stats and meta paths
- createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
- createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
- createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
- }
-
- //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
- private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
- COORDINATORAPP coord, Map<String, String> props, Storage storage)
- throws FalconException {
-
- String name = output.getName();
- String type = locType.name().toLowerCase();
-
- SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
- coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
-
- DATAOUT dataout = new DATAOUT();
- dataout.setName(name + type);
- dataout.setDataset(name + type);
- dataout.setInstance(getELExpression(output.getInstance()));
-
- OUTPUTEVENTS outputEvents = coord.getOutputEvents();
- if (outputEvents == null) {
- outputEvents = new OUTPUTEVENTS();
- coord.setOutputEvents(outputEvents);
- }
- outputEvents.getDataOut().add(dataout);
-
- String outputExpr = "${coord:dataOut('" + name + type + "')}";
- props.put(name + "." + type, outputExpr);
- }
- //RESUME CHECKSTYLE CHECK ParameterNumberCheck
-
- private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
- Map<String, String> props, String prefix) {
- props.put(prefix + "_storage_type", tableStorage.getType().name());
- props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
- props.put(prefix + "_database", tableStorage.getDatabase());
- props.put(prefix + "_table", tableStorage.getTable());
- }
-
- private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
- Map<String, String> props) {
- String prefix = "falcon_" + input.getName();
-
- propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
- props.put(prefix + "_partition_filter_pig",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
- props.put(prefix + "_partition_filter_hive",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
- props.put(prefix + "_partition_filter_java",
- "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
- }
-
- private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
- Map<String, String> props) {
- String prefix = "falcon_" + output.getName();
-
- propagateCommonCatalogTableProperties(tableStorage, props, prefix);
-
- props.put(prefix + "_dataout_partitions",
- "${coord:dataOutPartitions('" + output.getName() + "')}");
- props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
- + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
- }
-
- private String join(Iterator<String> itr, char sep) {
- String joinedStr = StringUtils.join(itr, sep);
- if (joinedStr.isEmpty()) {
- joinedStr = "null";
- }
- return joinedStr;
- }
-
- private String getELExpression(String expr) {
- if (expr != null) {
- expr = "${" + expr + "}";
- }
- return expr;
- }
-
- @Override
- protected Map<String, String> getEntityProperties() {
- Process process = getEntity();
- Map<String, String> props = new HashMap<String, String>();
- if (process.getProperties() != null) {
- for (Property prop : process.getProperties().getProperties()) {
- props.put(prop.getName(), prop.getValue());
- }
- }
- return props;
- }
-
- private void propagateUserWorkflowProperties(Workflow processWorkflow,
- Map<String, String> props, String processName) {
- props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
- processWorkflow.getName(), processName));
- props.put("userWorkflowVersion", processWorkflow.getVersion());
- props.put("userWorkflowEngine", processWorkflow.getEngine().value());
- }
-
- protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
- String wfName, Path parentWfPath) throws FalconException {
- WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
- wfApp.setName(wfName);
- try {
- addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
- } catch (IOException e) {
- throw new FalconException("Failed to add library extensions for the workflow", e);
- }
-
- String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
- EngineType engineType = processWorkflow.getEngine();
- for (Object object : wfApp.getDecisionOrForkOrJoin()) {
- if (!(object instanceof ACTION)) {
- continue;
- }
-
- ACTION action = (ACTION) object;
- String actionName = action.getName();
- if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
- action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
- } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
- decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
- } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
- decorateHiveAction(cluster, process, action, parentWfPath);
- } else if (FALCON_ACTIONS.contains(actionName)) {
- decorateWithOozieRetries(action);
- }
- }
-
- //Create parent workflow
- marshal(cluster, wfApp, parentWfPath);
- }
-
- private void decoratePIGAction(Cluster cluster, Process process,
- PIG pigAction, Path parentWfPath) throws FalconException {
- Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
- pigAction.setScript("${nameNode}" + userWfPath.toString());
-
- addPrepareDeleteOutputPath(process, pigAction);
-
- final List<String> paramList = pigAction.getParam();
- addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
- addOutputFeedsAsParams(paramList, process, cluster);
-
- propagateProcessProperties(pigAction, process);
-
- Storage.TYPE storageType = getStorageType(cluster, process);
- if (Storage.TYPE.TABLE == storageType) {
- // adds hive-site.xml in pig classpath
- setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
- pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
- }
-
- addArchiveForCustomJars(cluster, pigAction.getArchive(),
- getUserLibPath(cluster, parentWfPath.getParent()));
- }
-
- private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
- Path parentWfPath) throws FalconException {
-
- JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
- org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-
- Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
- hiveAction.setScript("${nameNode}" + userWfPath.toString());
-
- addPrepareDeleteOutputPath(process, hiveAction);
-
- final List<String> paramList = hiveAction.getParam();
- addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
- addOutputFeedsAsParams(paramList, process, cluster);
-
- propagateProcessProperties(hiveAction, process);
-
- setupHiveConfiguration(cluster, parentWfPath, "falcon-");
-
- addArchiveForCustomJars(cluster, hiveAction.getArchive(),
- getUserLibPath(cluster, parentWfPath.getParent()));
-
- marshalHiveAction(wfAction, actionJaxbElement);
- }
-
- private void addPrepareDeleteOutputPath(Process process,
- PIG pigAction) throws FalconException {
- List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
- if (deleteOutputPathList.isEmpty()) {
- return;
- }
-
- final PREPARE prepare = new PREPARE();
- final List<DELETE> deleteList = prepare.getDelete();
-
- for (String deletePath : deleteOutputPathList) {
- final DELETE delete = new DELETE();
- delete.setPath(deletePath);
- deleteList.add(delete);
- }
-
- if (!deleteList.isEmpty()) {
- pigAction.setPrepare(prepare);
- }
- }
-
- private void addPrepareDeleteOutputPath(Process process,
- org.apache.falcon.oozie.hive.ACTION hiveAction)
- throws FalconException {
-
- List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
- if (deleteOutputPathList.isEmpty()) {
- return;
- }
-
- org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
- List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
-
- for (String deletePath : deleteOutputPathList) {
- org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
- delete.setPath(deletePath);
- deleteList.add(delete);
- }
-
- if (!deleteList.isEmpty()) {
- hiveAction.setPrepare(prepare);
- }
- }
-
- private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
- final List<String> deleteList = new ArrayList<String>();
- if (process.getOutputs() == null) {
- return deleteList;
- }
-
- for (Output output : process.getOutputs().getOutputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
-
- if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
- continue; // prepare delete only applies to FileSystem storage
- }
-
- deleteList.add("${wf:conf('" + output.getName() + "')}");
- }
-
- return deleteList;
- }
-
- private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
- String engineType) throws FalconException {
- if (process.getInputs() == null) {
- return;
- }
-
- for (Input input : process.getInputs().getInputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- final String inputName = input.getName();
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
- Map<String, String> props = new HashMap<String, String>();
- propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
- for (String key : props.keySet()) {
- paramList.add(key + "=${wf:conf('" + key + "')}");
- }
-
- paramList.add(paramName + "_filter=${wf:conf('"
- + paramName + "_partition_filter_" + engineType + "')}");
- }
- }
- }
-
- private void addOutputFeedsAsParams(List<String> paramList, Process process,
- Cluster cluster) throws FalconException {
- if (process.getOutputs() == null) {
- return;
- }
-
- for (Output output : process.getOutputs().getOutputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
- Storage storage = FeedHelper.createStorage(cluster, feed);
-
- if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- final String outputName = output.getName(); // no prefix for backwards compatibility
- paramList.add(outputName + "=${" + outputName + "}");
- } else if (storage.getType() == Storage.TYPE.TABLE) {
- Map<String, String> props = new HashMap<String, String>();
- propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
- for (String key : props.keySet()) {
- paramList.add(key + "=${wf:conf('" + key + "')}");
- }
- }
- }
- }
-
- private void propagateProcessProperties(PIG pigAction, Process process) {
- org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
- if (processProperties == null) {
- return;
- }
-
- // Propagate user defined properties to job configuration
- final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
- pigAction.getConfiguration().getProperty();
-
- // Propagate user defined properties to pig script as macros
- // passed as parameters -p name=value that can be accessed as $name
- final List<String> paramList = pigAction.getParam();
-
- for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
- org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
- new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
- configProperty.setName(property.getName());
- configProperty.setValue(property.getValue());
- configuration.add(configProperty);
-
- paramList.add(property.getName() + "=" + property.getValue());
- }
- }
-
- private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
- org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
- if (processProperties == null) {
- return;
- }
-
- // Propagate user defined properties to job configuration
- final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
- hiveAction.getConfiguration().getProperty();
-
- // Propagate user defined properties to pig script as macros
- // passed as parameters -p name=value that can be accessed as $name
- final List<String> paramList = hiveAction.getParam();
-
- for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
- org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
- new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
- configProperty.setName(property.getName());
- configProperty.setValue(property.getValue());
- configuration.add(configProperty);
-
- paramList.add(property.getName() + "=" + property.getValue());
- }
- }
-
- private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
- Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
- if (process.getInputs() == null) {
- return storageType;
- }
-
- for (Input input : process.getInputs().getInputs()) {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
- storageType = FeedHelper.getStorageType(feed, cluster);
- if (Storage.TYPE.TABLE == storageType) {
- break;
- }
- }
-
- return storageType;
- }
-
- // creates hive-site.xml configuration in conf dir.
- private void setupHiveConfiguration(Cluster cluster, Path wfPath,
- String prefix) throws FalconException {
- String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
- try {
- FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
- Path confPath = new Path(wfPath, "conf");
- createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
- } catch (IOException e) {
- throw new FalconException(e);
- }
- }
-
- private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
- Path libPath) throws FalconException {
- if (libPath == null) {
- return;
- }
-
- try {
- final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
- if (fs.isFile(libPath)) { // File, not a Dir
- archiveList.add(libPath.toString());
- return;
- }
-
- // lib path is a directory, add each file under the lib dir to archive
- final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
- @Override
- public boolean accept(Path path) {
- try {
- return fs.isFile(path) && path.getName().endsWith(".jar");
- } catch (IOException ignore) {
- return false;
- }
- }
- });
-
- for (FileStatus fileStatus : fileStatuses) {
- archiveList.add(fileStatus.getPath().toString());
- }
- } catch (IOException e) {
- throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
- }
- }
-
- @SuppressWarnings("unchecked")
- protected JAXBElement<org.apache.falcon.oozie.hive.ACTION> unMarshalHiveAction(ACTION wfAction) {
- try {
- Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
- unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
- return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
- unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to unmarshall hive action.", e);
- }
- }
-
- protected void marshalHiveAction(ACTION wfAction,
- JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
- try {
- DOMResult hiveActionDOM = new DOMResult();
- Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
- marshaller.marshal(actionjaxbElement, hiveActionDOM);
- wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to marshall hive action.", e);
- }
- }
-}
[4/5] FALCON-356 Merge OozieProcessMapper and
OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
deleted file mode 100644
index e610df2..0000000
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ /dev/null
@@ -1,505 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
-import org.apache.falcon.oozie.workflow.JAVA;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tests for Oozie workflow definition for feed replication & retention.
- */
-public class OozieFeedMapperTest {
- private EmbeddedCluster srcMiniDFS;
- private EmbeddedCluster trgMiniDFS;
- private final ConfigurationStore store = ConfigurationStore.get();
- private Cluster srcCluster;
- private Cluster trgCluster;
- private Cluster alphaTrgCluster;
- private Cluster betaTrgCluster;
- private Feed feed;
- private Feed tableFeed;
- private Feed fsReplFeed;
-
- private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
- private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
- private static final String FEED = "/feed.xml";
- private static final String TABLE_FEED = "/table-replication-feed.xml";
- private static final String FS_REPLICATION_FEED = "/fs-replication-feed.xml";
-
- @BeforeClass
- public void setUpDFS() throws Exception {
- CurrentUser.authenticate("falcon");
-
- srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
- String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
-
- trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
- String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
-
- cleanupStore();
-
- srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
- trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
- alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
- betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
-
- feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
- fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED, null);
- tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED, null);
- }
-
- protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
- Unmarshaller unmarshaller = type.getUnmarshaller();
- Entity entity = (Entity) unmarshaller
- .unmarshal(OozieFeedMapperTest.class.getResource(template));
- store.publish(type, entity);
-
- if (type == EntityType.CLUSTER) {
- Cluster cluster = (Cluster) entity;
- ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
- FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
- fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
- fs.create(
- new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
- }
- return entity;
- }
-
- protected void cleanupStore() throws FalconException {
- for (EntityType type : EntityType.values()) {
- Collection<String> entities = store.getEntities(type);
- for (String entity : entities) {
- store.remove(type, entity);
- }
- }
- }
-
- @AfterClass
- public void stopDFS() {
- srcMiniDFS.shutdown();
- trgMiniDFS.shutdown();
- }
-
- @Test
- public void testReplicationCoordsForFSStorage() throws Exception {
- OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
- List<COORDINATORAPP> coords = feedMapper.getCoordinators(trgCluster,
- new Path("/projects/falcon/"));
- //Assert retention coord
- COORDINATORAPP coord = coords.get(0);
- assertLibExtensions(coord, "retention");
-
- //Assert replication coord
- coord = coords.get(1);
- Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
- Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
- .getAction().getWorkflow().getAppPath());
- Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
- + srcCluster.getName(), coord.getName());
- Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
- SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
- .getDatasetOrAsyncDataset().get(0);
- SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
- .getDatasetOrAsyncDataset().get(1);
-
- Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
- Assert.assertEquals("input-dataset", inputDataset.getName());
- Assert.assertEquals(
- ClusterHelper.getReadOnlyStorageUrl(srcCluster)
- + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
- inputDataset.getUriTemplate());
-
- Assert.assertEquals("${coord:minutes(20)}",
- outputDataset.getFrequency());
- Assert.assertEquals("output-dataset", outputDataset.getName());
- Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
- + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
- outputDataset.getUriTemplate());
- String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
- String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
- String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
- Assert.assertEquals("input", inEventName);
- Assert.assertEquals("input-dataset", inEventDataset);
- Assert.assertEquals("${now(0,-40)}", inEventInstance);
-
- String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
- Assert.assertEquals("${now(0,-40)}", outEventInstance);
-
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
-
- // verify the replication param that feed replicator depends on
- String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
- Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
-
- Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
- Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
- Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
- Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
-
- // verify the late data params
- Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
- Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
- Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
- Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
-
- // verify the post processing params
- Assert.assertEquals(props.get("feedNames"), feed.getName());
- Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
-
- // verify workflow params
- Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
- Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
- Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
-
- // verify default params
- Assert.assertEquals(props.get("queueName"), "default");
- Assert.assertEquals(props.get("jobPriority"), "NORMAL");
- Assert.assertEquals(props.get("maxMaps"), "5");
-
- assertLibExtensions(coord, "replication");
- assertWorkflowRetries(coord);
- }
-
- private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
- WORKFLOWAPP wf = getWorkflowapp(coord);
- List<Object> actions = wf.getDecisionOrForkOrJoin();
- for (Object obj : actions) {
- if (!(obj instanceof ACTION)) {
- continue;
- }
- ACTION action = (ACTION) obj;
- String actionName = action.getName();
- if (AbstractOozieEntityMapper.FALCON_ACTIONS.contains(actionName)) {
- Assert.assertEquals(action.getRetryMax(), "3");
- Assert.assertEquals(action.getRetryInterval(), "1");
- }
- }
- }
-
- private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
- WORKFLOWAPP wf = getWorkflowapp(coord);
- List<Object> actions = wf.getDecisionOrForkOrJoin();
- for (Object obj : actions) {
- if (!(obj instanceof ACTION)) {
- continue;
- }
- ACTION action = (ACTION) obj;
- List<String> files = null;
- if (action.getJava() != null) {
- files = action.getJava().getFile();
- } else if (action.getPig() != null) {
- files = action.getPig().getFile();
- } else if (action.getMapReduce() != null) {
- files = action.getMapReduce().getFile();
- }
- if (files != null) {
- Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
- + lifecycle + "/ext.jar"));
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
- String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
- JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
- return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
- trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
- }
-
- @Test
- public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
- OozieFeedMapper feedMapper = new OozieFeedMapper(fsReplFeed);
-
- List<COORDINATORAPP> alphaCoords = feedMapper.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
- final COORDINATORAPP alphaCoord = alphaCoords.get(0);
- Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
- Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
-
- String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
- assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
-
- List<COORDINATORAPP> betaCoords = feedMapper.getCoordinators(betaTrgCluster, new Path("/beta/falcon/"));
- final COORDINATORAPP betaCoord = betaCoords.get(0);
- Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
- Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
-
- pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
- assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
- }
-
- private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
- Feed aFeed) throws FalconException {
- String srcPart = FeedHelper.normalizePartitionExpression(
- FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
- srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
- String targetPart = FeedHelper.normalizePartitionExpression(
- FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
- targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
-
- StringBuilder pathsWithPartitions = new StringBuilder();
- pathsWithPartitions.append("${coord:dataIn('input')}/")
- .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
- String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
- parts = StringUtils.stripEnd(parts, "/");
- return parts;
- }
-
- private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
- String pathsWithPartitions) throws JAXBException, IOException {
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
- Date startDate = feedCluster.getValidity().getStart();
- Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
-
- Date endDate = feedCluster.getValidity().getEnd();
- Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
-
- WORKFLOWAPP workflow = getWorkflowapp(coord);
- assertWorkflowDefinition(fsReplFeed, workflow);
-
- List<Object> actions = workflow.getDecisionOrForkOrJoin();
- System.out.println("actions = " + actions);
-
- ACTION replicationActionNode = (ACTION) actions.get(4);
- Assert.assertEquals(replicationActionNode.getName(), "replication");
-
- JAVA replication = replicationActionNode.getJava();
- List<String> args = replication.getArg();
- Assert.assertEquals(args.size(), 11);
-
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
-
- Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
- Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
- Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
- Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
- Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
- Assert.assertEquals(props.get("maxMaps"), "33");
- }
-
- public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
- Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
-
- List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
- Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
- Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
- Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
- Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
- Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
- Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
- Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
- Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
- Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
- }
-
- @Test
- public void testReplicationCoordsForTableStorage() throws Exception {
- OozieFeedMapper feedMapper = new OozieFeedMapper(tableFeed);
- List<COORDINATORAPP> coords = feedMapper.getCoordinators(
- trgCluster, new Path("/projects/falcon/"));
- COORDINATORAPP coord = coords.get(0);
-
- Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
- Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
- coord.getAction().getWorkflow().getAppPath());
- Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
- + srcCluster.getName(), coord.getName());
- Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
-
- SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
- .getDatasetOrAsyncDataset().get(0);
- Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
- Assert.assertEquals("input-dataset", inputDataset.getName());
-
- String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
- sourceRegistry = sourceRegistry.replace("thrift", "hcat");
- Assert.assertEquals(inputDataset.getUriTemplate(),
- sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
-
- SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
- .getDatasetOrAsyncDataset().get(1);
- Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
- Assert.assertEquals("output-dataset", outputDataset.getName());
-
- String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
- targetRegistry = targetRegistry.replace("thrift", "hcat");
- Assert.assertEquals(outputDataset.getUriTemplate(),
- targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
-
- String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
- String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
- String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
- Assert.assertEquals("input", inEventName);
- Assert.assertEquals("input-dataset", inEventDataset);
- Assert.assertEquals("${now(0,-40)}", inEventInstance);
-
- String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
- Assert.assertEquals("${now(0,-40)}", outEventInstance);
-
- // assert FS staging area
- String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
- final FileSystem fs = trgMiniDFS.getFileSystem();
- Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
- Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
- Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
-
- Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
- Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
- Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
-
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
-
- final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
- final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
-
- // verify the replication param that feed replicator depends on
- Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
-
- Assert.assertTrue(props.containsKey("distcpSourcePaths"));
- Assert.assertEquals(props.get("distcpSourcePaths"),
- FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
- + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
- + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
-
- Assert.assertTrue(props.containsKey("distcpTargetPaths"));
- Assert.assertEquals(props.get("distcpTargetPaths"),
- FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
- + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
- + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
-
- Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
-
- // verify table props
- assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
- assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
-
- // verify the late data params
- Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
- Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
- Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
-
- // verify the post processing params
- Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
- Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
- }
-
- private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
- Map<String, String> props, String prefix) {
- Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
- Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
- Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
-
- Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
- Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
- Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
- }
-
- @Test
- public void testRetentionCoords() throws FalconException, JAXBException, IOException {
- org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
- final Calendar instance = Calendar.getInstance();
- instance.roll(Calendar.YEAR, 1);
- cluster.getValidity().setEnd(instance.getTime());
-
- OozieFeedMapper feedMapper = new OozieFeedMapper(feed);
- List<COORDINATORAPP> coords = feedMapper.getCoordinators(srcCluster, new Path("/projects/falcon/"));
- COORDINATORAPP coord = coords.get(0);
-
- Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
- Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
- Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
-
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
-
- String feedDataPath = props.get("feedDataPath");
- String storageType = props.get("falconFeedStorageType");
-
- // verify the param that feed evictor depends on
- Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
-
- final Storage storage = FeedHelper.createStorage(cluster, feed);
- if (feedDataPath != null) {
- Assert.assertEquals(feedDataPath, storage.getUriTemplate()
- .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
- }
-
- if (storageType != null) {
- Assert.assertEquals(storageType, storage.getType().name());
- }
-
- // verify the post processing params
- Assert.assertEquals(props.get("feedNames"), feed.getName());
- Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
-
- assertWorkflowRetries(coord);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
new file mode 100644
index 0000000..182d9cb
--- /dev/null
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -0,0 +1,505 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.converter;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Tag;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.JAVA;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.workflow.OozieFeedWorkflowBuilder;
+import org.apache.falcon.workflow.OozieWorkflowBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for Oozie workflow definition for feed replication & retention.
+ */
+public class OozieFeedWorkflowBuilderTest {
+ private EmbeddedCluster srcMiniDFS;
+ private EmbeddedCluster trgMiniDFS;
+ private final ConfigurationStore store = ConfigurationStore.get();
+ private Cluster srcCluster;
+ private Cluster trgCluster;
+ private Cluster alphaTrgCluster;
+ private Cluster betaTrgCluster;
+ private Feed feed;
+ private Feed tableFeed;
+ private Feed fsReplFeed;
+
+ private static final String SRC_CLUSTER_PATH = "/src-cluster.xml";
+ private static final String TRG_CLUSTER_PATH = "/trg-cluster.xml";
+ private static final String FEED = "/feed.xml";
+ private static final String TABLE_FEED = "/table-replication-feed.xml";
+ private static final String FS_REPLICATION_FEED = "/fs-replication-feed.xml";
+
+ @BeforeClass
+ public void setUpDFS() throws Exception {
+ CurrentUser.authenticate("falcon");
+
+ srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
+ String srcHdfsUrl = srcMiniDFS.getConf().get("fs.default.name");
+
+ trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
+ String trgHdfsUrl = trgMiniDFS.getConf().get("fs.default.name");
+
+ cleanupStore();
+
+ srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
+ trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
+ alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-alpha.xml", trgHdfsUrl);
+ betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/trg-cluster-beta.xml", trgHdfsUrl);
+
+ feed = (Feed) storeEntity(EntityType.FEED, FEED, null);
+ fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED, null);
+ tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED, null);
+ }
+
+ protected Entity storeEntity(EntityType type, String template, String writeEndpoint) throws Exception {
+ Unmarshaller unmarshaller = type.getUnmarshaller();
+ Entity entity = (Entity) unmarshaller
+ .unmarshal(OozieFeedWorkflowBuilderTest.class.getResource(template));
+ store.publish(type, entity);
+
+ if (type == EntityType.CLUSTER) {
+ Cluster cluster = (Cluster) entity;
+ ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
+ FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
+ fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
+ fs.create(
+ new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
+ }
+ return entity;
+ }
+
+ protected void cleanupStore() throws FalconException {
+ for (EntityType type : EntityType.values()) {
+ Collection<String> entities = store.getEntities(type);
+ for (String entity : entities) {
+ store.remove(type, entity);
+ }
+ }
+ }
+
+ @AfterClass
+ public void stopDFS() {
+ srcMiniDFS.shutdown();
+ trgMiniDFS.shutdown();
+ }
+
+ @Test
+ public void testReplicationCoordsForFSStorage() throws Exception {
+ OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+ List<COORDINATORAPP> coords = builder.getCoordinators(trgCluster, new Path("/projects/falcon/"));
+ //Assert retention coord
+ COORDINATORAPP coord = coords.get(0);
+ assertLibExtensions(coord, "retention");
+
+ //Assert replication coord
+ coord = coords.get(1);
+ Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+ Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION", coord
+ .getAction().getWorkflow().getAppPath());
+ Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
+ + srcCluster.getName(), coord.getName());
+ Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+ SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+ .getDatasetOrAsyncDataset().get(0);
+ SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+ .getDatasetOrAsyncDataset().get(1);
+
+ Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+ Assert.assertEquals("input-dataset", inputDataset.getName());
+ Assert.assertEquals(
+ ClusterHelper.getReadOnlyStorageUrl(srcCluster)
+ + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+ inputDataset.getUriTemplate());
+
+ Assert.assertEquals("${coord:minutes(20)}",
+ outputDataset.getFrequency());
+ Assert.assertEquals("output-dataset", outputDataset.getName());
+ Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
+ + "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
+ outputDataset.getUriTemplate());
+ String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+ String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+ String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+ Assert.assertEquals("input", inEventName);
+ Assert.assertEquals("input-dataset", inEventDataset);
+ Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+ String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+ Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ // verify the replication param that feed replicator depends on
+ String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
+ Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+
+ Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+ Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+
+ // verify the late data params
+ Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
+ Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
+ Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+
+ // verify the post processing params
+ Assert.assertEquals(props.get("feedNames"), feed.getName());
+ Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+
+ // verify workflow params
+ Assert.assertEquals(props.get("userWorkflowName"), "replication-policy");
+ Assert.assertEquals(props.get("userWorkflowVersion"), "0.5");
+ Assert.assertEquals(props.get("userWorkflowEngine"), "falcon");
+
+ // verify default params
+ Assert.assertEquals(props.get("queueName"), "default");
+ Assert.assertEquals(props.get("jobPriority"), "NORMAL");
+ Assert.assertEquals(props.get("maxMaps"), "5");
+
+ assertLibExtensions(coord, "replication");
+ assertWorkflowRetries(coord);
+ }
+
+ private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
+ WORKFLOWAPP wf = getWorkflowapp(coord);
+ List<Object> actions = wf.getDecisionOrForkOrJoin();
+ for (Object obj : actions) {
+ if (!(obj instanceof ACTION)) {
+ continue;
+ }
+ ACTION action = (ACTION) obj;
+ String actionName = action.getName();
+ if (OozieWorkflowBuilder.FALCON_ACTIONS.contains(actionName)) {
+ Assert.assertEquals(action.getRetryMax(), "3");
+ Assert.assertEquals(action.getRetryInterval(), "1");
+ }
+ }
+ }
+
+ private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
+ WORKFLOWAPP wf = getWorkflowapp(coord);
+ List<Object> actions = wf.getDecisionOrForkOrJoin();
+ for (Object obj : actions) {
+ if (!(obj instanceof ACTION)) {
+ continue;
+ }
+ ACTION action = (ACTION) obj;
+ List<String> files = null;
+ if (action.getJava() != null) {
+ files = action.getJava().getFile();
+ } else if (action.getPig() != null) {
+ files = action.getPig().getFile();
+ } else if (action.getMapReduce() != null) {
+ files = action.getMapReduce().getFile();
+ }
+ if (files != null) {
+ Assert.assertTrue(files.get(files.size() - 1).endsWith("/projects/falcon/working/libext/FEED/"
+ + lifecycle + "/ext.jar"));
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private WORKFLOWAPP getWorkflowapp(COORDINATORAPP coord) throws JAXBException, IOException {
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+ return ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
+ trgMiniDFS.getFileSystem().open(new Path(wfPath, "workflow.xml")))).getValue();
+ }
+
+ @Test
+ public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
+ OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(fsReplFeed);
+
+ List<COORDINATORAPP> alphaCoords = builder.getCoordinators(alphaTrgCluster, new Path("/alpha/falcon/"));
+ final COORDINATORAPP alphaCoord = alphaCoords.get(0);
+ Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
+ Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
+
+ String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
+ assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
+
+ List<COORDINATORAPP> betaCoords = builder.getCoordinators(betaTrgCluster, new Path("/beta/falcon/"));
+ final COORDINATORAPP betaCoord = betaCoords.get(0);
+ Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
+ Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
+
+ pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
+ assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
+ }
+
+ private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
+ Feed aFeed) throws FalconException {
+ String srcPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
+ srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
+ String targetPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
+ targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
+
+ StringBuilder pathsWithPartitions = new StringBuilder();
+ pathsWithPartitions.append("${coord:dataIn('input')}/")
+ .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+ String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+ parts = StringUtils.stripEnd(parts, "/");
+ return parts;
+ }
+
+ private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
+ String pathsWithPartitions) throws JAXBException, IOException {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
+ Date startDate = feedCluster.getValidity().getStart();
+ Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
+
+ Date endDate = feedCluster.getValidity().getEnd();
+ Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
+
+ WORKFLOWAPP workflow = getWorkflowapp(coord);
+ assertWorkflowDefinition(fsReplFeed, workflow);
+
+ List<Object> actions = workflow.getDecisionOrForkOrJoin();
+ System.out.println("actions = " + actions);
+
+ ACTION replicationActionNode = (ACTION) actions.get(4);
+ Assert.assertEquals(replicationActionNode.getName(), "replication");
+
+ JAVA replication = replicationActionNode.getJava();
+ List<String> args = replication.getArg();
+ Assert.assertEquals(args.size(), 11);
+
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
+ Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
+ Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
+ Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
+ Assert.assertEquals(props.get("maxMaps"), "33");
+ }
+
+ public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
+ Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(), parentWorkflow.getName());
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+ Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
+ Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
+ Assert.assertEquals("replication-decision", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
+ Assert.assertEquals("table-export", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+ Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+ Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
+ Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+ Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+ Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+ }
+
+ @Test
+ public void testReplicationCoordsForTableStorage() throws Exception {
+ OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(tableFeed);
+ List<COORDINATORAPP> coords = builder.getCoordinators(
+ trgCluster, new Path("/projects/falcon/"));
+ COORDINATORAPP coord = coords.get(0);
+
+ Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
+ Assert.assertEquals("${nameNode}/projects/falcon/REPLICATION",
+ coord.getAction().getWorkflow().getAppPath());
+ Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
+ + srcCluster.getName(), coord.getName());
+ Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
+
+ SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
+ .getDatasetOrAsyncDataset().get(0);
+ Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
+ Assert.assertEquals("input-dataset", inputDataset.getName());
+
+ String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
+ sourceRegistry = sourceRegistry.replace("thrift", "hcat");
+ Assert.assertEquals(inputDataset.getUriTemplate(),
+ sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+ SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
+ .getDatasetOrAsyncDataset().get(1);
+ Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
+ Assert.assertEquals("output-dataset", outputDataset.getName());
+
+ String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
+ targetRegistry = targetRegistry.replace("thrift", "hcat");
+ Assert.assertEquals(outputDataset.getUriTemplate(),
+ targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
+
+ String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
+ String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
+ String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
+ Assert.assertEquals("input", inEventName);
+ Assert.assertEquals("input-dataset", inEventDataset);
+ Assert.assertEquals("${now(0,-40)}", inEventInstance);
+
+ String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
+ Assert.assertEquals("${now(0,-40)}", outEventInstance);
+
+ // assert FS staging area
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ final FileSystem fs = trgMiniDFS.getFileSystem();
+ Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
+ Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
+ Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
+
+ Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
+ Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
+ Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
+
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
+ final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
+
+ // verify the replication param that feed replicator depends on
+ Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
+
+ Assert.assertTrue(props.containsKey("distcpSourcePaths"));
+ Assert.assertEquals(props.get("distcpSourcePaths"),
+ FeedHelper.getStagingDir(srcCluster, tableFeed, srcStorage, Tag.REPLICATION)
+ + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+ + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+ Assert.assertTrue(props.containsKey("distcpTargetPaths"));
+ Assert.assertEquals(props.get("distcpTargetPaths"),
+ FeedHelper.getStagingDir(trgCluster, tableFeed, trgStorage, Tag.REPLICATION)
+ + "/ds=${coord:dataOutPartitionValue('output', 'ds')}/"
+ + "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}/data");
+
+ Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
+
+ // verify table props
+ assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
+ assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
+
+ // verify the late data params
+ Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
+ Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+ // verify the post processing params
+ Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
+ Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+ }
+
+ private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+ Map<String, String> props, String prefix) {
+ Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
+ Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
+ Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
+
+ Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
+ Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
+ Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitionFilter('input', 'hive')}");
+ }
+
+ @Test
+ public void testRetentionCoords() throws FalconException, JAXBException, IOException {
+ org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
+ final Calendar instance = Calendar.getInstance();
+ instance.roll(Calendar.YEAR, 1);
+ cluster.getValidity().setEnd(instance.getTime());
+
+ OozieWorkflowBuilder builder = new OozieFeedWorkflowBuilder(feed);
+ List<COORDINATORAPP> coords = builder.getCoordinators(srcCluster, new Path("/projects/falcon/"));
+ COORDINATORAPP coord = coords.get(0);
+
+ Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
+ Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
+ Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
+
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ String feedDataPath = props.get("feedDataPath");
+ String storageType = props.get("falconFeedStorageType");
+
+ // verify the param that feed evictor depends on
+ Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
+
+ final Storage storage = FeedHelper.createStorage(cluster, feed);
+ if (feedDataPath != null) {
+ Assert.assertEquals(feedDataPath, storage.getUriTemplate()
+ .replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+ }
+
+ if (storageType != null) {
+ Assert.assertEquals(storageType, storage.getType().name());
+ }
+
+ // verify the post processing params
+ Assert.assertEquals(props.get("feedNames"), feed.getName());
+ Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+
+ assertWorkflowRetries(coord);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
deleted file mode 100644
index f443939..0000000
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.FalconRuntimException;
-import org.apache.falcon.Tag;
-import org.apache.commons.io.IOUtils;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.ExternalId;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Property;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.bundle.COORDINATOR;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.ObjectFactory;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.SecurityUtil;
-import org.apache.falcon.service.FalconPathFilter;
-import org.apache.falcon.service.SharedLibraryHostingService;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.log4j.Logger;
-import org.apache.oozie.client.OozieClient;
-
-import javax.xml.bind.*;
-import java.io.*;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * Entity mapper base class that allows an entity to be mapped to oozie bundle.
- * @param <T>
- */
-public abstract class AbstractOozieEntityMapper<T extends Entity> {
-
- private static final Logger LOG = Logger.getLogger(AbstractOozieEntityMapper.class);
-
- protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
-
- protected static final String ACTUAL_TIME_EL = "${coord:formatTime(coord:actualTime(), 'yyyy-MM-dd-HH-mm')}";
- protected static final Long DEFAULT_BROKER_MSG_TTL = 3 * 24 * 60L;
- protected static final String MR_QUEUE_NAME = "queueName";
- protected static final String MR_JOB_PRIORITY = "jobPriority";
-
- protected static final JAXBContext WORKFLOW_JAXB_CONTEXT;
- protected static final JAXBContext COORD_JAXB_CONTEXT;
- protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
- protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
- public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(new String[] { "recordsize",
- "succeeded-post-processing", "failed-post-processing", }));
-
- protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith("falcon");
- }
-
- @Override
- public String getJarName(Path path) {
- String name = path.getName();
- if (name.endsWith(".jar")) {
- name = name.substring(0, name.indexOf(".jar"));
- }
- return name;
- }
- };
-
- static {
- try {
- WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
- COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
- BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
- HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
- org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
- } catch (JAXBException e) {
- throw new RuntimeException("Unable to create JAXB context", e);
- }
- }
-
- private final T entity;
-
- protected AbstractOozieEntityMapper(T entity) {
- this.entity = entity;
- }
-
- protected T getEntity() {
- return entity;
- }
-
- protected Path getCoordPath(Path bundlePath, String coordName) {
- Tag tag = EntityUtil.getWorkflowNameTag(coordName, getEntity());
- return new Path(bundlePath, tag.name());
- }
-
- protected abstract Map<String, String> getEntityProperties();
-
- public boolean map(Cluster cluster, Path bundlePath) throws FalconException {
- BUNDLEAPP bundleApp = new BUNDLEAPP();
- bundleApp.setName(EntityUtil.getWorkflowName(entity).toString());
- // all the properties are set prior to bundle and coordinators creation
-
- List<COORDINATORAPP> coordinators = getCoordinators(cluster, bundlePath);
- if (coordinators.size() == 0) {
- return false;
- }
- for (COORDINATORAPP coordinatorapp : coordinators) {
- Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
- String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
- EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
- createLogsDir(cluster, coordPath);
- COORDINATOR bundleCoord = new COORDINATOR();
- bundleCoord.setName(coordinatorapp.getName());
- bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
- bundleApp.getCoordinator().add(bundleCoord);
-
- copySharedLibs(cluster, coordPath);
- }
-
- marshal(cluster, bundleApp, bundlePath);
- return true;
- }
-
- private void addExtensionJars(FileSystem fs, Path path, WORKFLOWAPP wf) throws IOException {
- FileStatus[] libs = null;
- try {
- libs = fs.listStatus(path);
- } catch(FileNotFoundException ignore) {
- //Ok if the libext is not configured
- }
-
- if (libs == null) {
- return;
- }
-
- for(FileStatus lib : libs) {
- if (lib.isDir()) {
- continue;
- }
-
- for(Object obj: wf.getDecisionOrForkOrJoin()) {
- if (!(obj instanceof ACTION)) {
- continue;
- }
- ACTION action = (ACTION) obj;
- List<String> files = null;
- if (action.getJava() != null) {
- files = action.getJava().getFile();
- } else if (action.getPig() != null) {
- files = action.getPig().getFile();
- } else if (action.getMapReduce() != null) {
- files = action.getMapReduce().getFile();
- }
- if (files != null) {
- files.add(lib.getPath().toString());
- }
- }
- }
- }
-
- protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, EntityType type, String lifecycle)
- throws IOException, FalconException {
- String libext = ClusterHelper.getLocation(cluster, "working") + "/libext";
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
- addExtensionJars(fs, new Path(libext), wf);
- addExtensionJars(fs, new Path(libext, type.name()), wf);
- if (StringUtils.isNotEmpty(lifecycle)) {
- addExtensionJars(fs, new Path(libext, type.name() + "/" + lifecycle), wf);
- }
- }
-
- private void copySharedLibs(Cluster cluster, Path coordPath) throws FalconException {
- try {
- Path libPath = new Path(coordPath, "lib");
- SharedLibraryHostingService.pushLibsToHDFS(StartupProperties.get().getProperty("system.lib.location"),
- libPath, cluster, FALCON_JAR_FILTER);
- } catch (IOException e) {
- throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
- }
- }
-
- protected abstract List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException;
-
- protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
- org.apache.falcon.oozie.coordinator.CONFIGURATION conf
- = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
- List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
- for (Entry<String, String> prop : propMap.entrySet()) {
- props.add(createCoordProperty(prop.getKey(), prop.getValue()));
- }
- return conf;
- }
-
- protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
- Map<String, String> props = new HashMap<String, String>();
- props.put(ARG.entityName.getPropName(), entity.getName());
- props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
- props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
- props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
- props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
- String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
- "tcp://localhost:61616?daemon=true");
- props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
- String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
- ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
- props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
- String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
- DEFAULT_BROKER_MSG_TTL.toString());
- props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
- props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
- props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
- props.put(OozieClient.EXTERNAL_ID,
- new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
- "${coord:nominalTime()}").getId());
- props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
- try {
- if (EntityUtil.getLateProcess(entity) == null
- || EntityUtil.getLateProcess(entity).getLateInputs() == null
- || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
- props.put("shouldRecord", "false");
- } else {
- props.put("shouldRecord", "true");
- }
- } catch (FalconException e) {
- LOG.error("Unable to get Late Process for entity:" + entity, e);
- throw new FalconRuntimException(e);
- }
- props.put("entityName", entity.getName());
- props.put("entityType", entity.getEntityType().name().toLowerCase());
- props.put(ARG.cluster.getPropName(), cluster.getName());
- if (cluster.getProperties() != null) {
- for (Property prop : cluster.getProperties().getProperties()) {
- props.put(prop.getName(), prop.getValue());
- }
- }
-
- props.put(MR_QUEUE_NAME, "default");
- props.put(MR_JOB_PRIORITY, "NORMAL");
- //props in entity override the set props.
- props.putAll(getEntityProperties());
- return props;
- }
-
- protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
- String value) {
- org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
- = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
- prop.setName(name);
- prop.setValue(value);
- return prop;
- }
-
- protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
- throws FalconException {
- try {
- Marshaller marshaller = jaxbContext.createMarshaller();
- marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
- FileSystem fs = HadoopClientFactory.get().createFileSystem(
- outPath.toUri(), ClusterHelper.getConfiguration(cluster));
- OutputStream out = fs.create(outPath);
- try {
- marshaller.marshal(jaxbElement, out);
- } finally {
- out.close();
- }
- if (LOG.isDebugEnabled()) {
- StringWriter writer = new StringWriter();
- marshaller.marshal(jaxbElement, writer);
- LOG.debug("Writing definition to " + outPath + " on cluster " + cluster.getName());
- LOG.debug(writer.getBuffer());
- }
-
- LOG.info("Marshalled " + jaxbElement.getDeclaredType() + " to " + outPath);
- } catch (Exception e) {
- throw new FalconException("Unable to marshall app object", e);
- }
- }
-
- private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
- try {
- FileSystem fs = HadoopClientFactory.get().createFileSystem(
- coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
- Path logsDir = new Path(coordPath, "../../logs");
- fs.mkdirs(logsDir);
-
- // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
- FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
- fs.setPermission(logsDir, permission);
- } catch (Exception e) {
- throw new FalconException("Unable to create temp dir in " + coordPath, e);
- }
- }
-
- protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath, String name) throws FalconException {
- if (StringUtils.isEmpty(name)) {
- name = "coordinator";
- }
- name = name + ".xml";
- marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), COORD_JAXB_CONTEXT, new Path(outPath, name));
- return name;
- }
-
- protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
-
- marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
- BUNDLE_JAXB_CONTEXT,
- new Path(outPath, "bundle.xml"));
- }
-
- protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
-
- marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
- WORKFLOW_JAXB_CONTEXT,
- new Path(outPath, "workflow.xml"));
- }
-
- protected String getStoragePath(Path path) {
- if (path != null) {
- return getStoragePath(path.toString());
- }
- return null;
- }
-
- protected String getStoragePath(String path) {
- if (StringUtils.isNotEmpty(path)) {
- if (new Path(path).toUri().getScheme() == null) {
- path = "${nameNode}" + path;
- }
- }
- return path;
- }
-
- protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
- InputStream resourceAsStream = null;
- try {
- resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
- Unmarshaller unmarshaller = WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
- @SuppressWarnings("unchecked")
- JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
- resourceAsStream);
- return jaxbElement.getValue();
- } catch (JAXBException e) {
- throw new FalconException(e);
- } finally {
- IOUtils.closeQuietly(resourceAsStream);
- }
- }
-
- protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
- InputStream resourceAsStream = null;
- try {
- resourceAsStream = AbstractOozieEntityMapper.class.getResourceAsStream(template);
- Unmarshaller unmarshaller = COORD_JAXB_CONTEXT.createUnmarshaller();
- @SuppressWarnings("unchecked")
- JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>)
- unmarshaller.unmarshal(resourceAsStream);
- return jaxbElement.getValue();
- } catch (JAXBException e) {
- throw new FalconException(e);
- } finally {
- IOUtils.closeQuietly(resourceAsStream);
- }
- }
-
- protected void createHiveConf(FileSystem fs, Path confPath, String metastoreUrl,
- Cluster cluster, String prefix) throws IOException {
- Configuration hiveConf = new Configuration(false);
- hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUrl);
- hiveConf.set("hive.metastore.local", "false");
-
- if (UserGroupInformation.isSecurityEnabled()) {
- hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
- ClusterHelper.getPropertyValue(cluster, SecurityUtil.HIVE_METASTORE_PRINCIPAL));
- hiveConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
- }
-
- OutputStream out = null;
- try {
- out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
- hiveConf.writeXml(out);
- } finally {
- IOUtils.closeQuietly(out);
- }
- }
-
- protected void decorateWithOozieRetries(ACTION action) {
- Properties props = RuntimeProperties.get();
- action.setRetryMax(props.getProperty("falcon.parentworkflow.retry.max", "3"));
- action.setRetryInterval(props.getProperty("falcon.parentworkflow.retry.interval.secs", "1"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 2f53370..9e1c82d 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -17,8 +17,20 @@
*/
package org.apache.falcon.util;
+import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.hive.ACTION;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.hadoop.conf.Configuration;
+import org.apache.xerces.dom.ElementNSImpl;
+import org.w3c.dom.Document;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.dom.DOMResult;
import java.io.ByteArrayInputStream;
import java.util.Map;
import java.util.Properties;
@@ -27,6 +39,23 @@ import java.util.Properties;
* Help methods relating to oozie configuration.
*/
public final class OozieUtils {
+ public static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+ public static final JAXBContext COORD_JAXB_CONTEXT;
+ public static final JAXBContext BUNDLE_JAXB_CONTEXT;
+ protected static final JAXBContext HIVE_ACTION_JAXB_CONTEXT;
+
+ static {
+ try {
+ WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+ COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
+ BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
+ HIVE_ACTION_JAXB_CONTEXT = JAXBContext.newInstance(
+ org.apache.falcon.oozie.hive.ACTION.class.getPackage().getName());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to create JAXB context", e);
+ }
+ }
+
private OozieUtils() {}
@@ -39,4 +68,29 @@ public final class OozieUtils {
}
return jobprops;
}
+
+ @SuppressWarnings("unchecked")
+ public static JAXBElement<ACTION> unMarshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction) {
+ try {
+ Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+ return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
+ unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to unmarshall hive action.", e);
+ }
+ }
+
+ public static void marshalHiveAction(org.apache.falcon.oozie.workflow.ACTION wfAction,
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
+ try {
+ DOMResult hiveActionDOM = new DOMResult();
+ Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
+ marshaller.marshal(actionjaxbElement, hiveActionDOM);
+ wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to marshall hive action.", e);
+ }
+ }
+
}
[5/5] git commit: FALCON-356 Merge OozieProcessMapper and
OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Posted by sh...@apache.org.
FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/e2545b08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/e2545b08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/e2545b08
Branch: refs/heads/master
Commit: e2545b0874d206f3be88d4b3ac7003eae1161c44
Parents: 5e43521
Author: Shwetha GS <sh...@gmail.com>
Authored: Tue Mar 18 17:10:54 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Tue Mar 18 17:10:54 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/falcon/util/ReflectionUtils.java | 33 +-
.../apache/falcon/workflow/WorkflowBuilder.java | 20 +-
.../apache/falcon/util/ReflectionUtilsTest.java | 49 ++
.../falcon/converter/OozieFeedMapper.java | 596 -------------
.../workflow/OozieFeedWorkflowBuilder.java | 594 ++++++++++++-
.../falcon/converter/OozieFeedMapperTest.java | 505 -----------
.../converter/OozieFeedWorkflowBuilderTest.java | 505 +++++++++++
.../converter/AbstractOozieEntityMapper.java | 428 ----------
.../java/org/apache/falcon/util/OozieUtils.java | 54 ++
.../falcon/workflow/OozieWorkflowBuilder.java | 370 ++++++++
.../workflow/engine/OozieWorkflowEngine.java | 15 +-
.../falcon/converter/OozieProcessMapper.java | 833 -------------------
.../workflow/OozieProcessWorkflowBuilder.java | 822 +++++++++++++++++-
.../OozieProcessMapperLateProcessTest.java | 96 ---
.../converter/OozieProcessMapperTest.java | 557 -------------
.../OozieProcessWorkflowBuilderTest.java | 559 +++++++++++++
.../falcon/retention/FeedEvictorTest.java | 2 +-
18 files changed, 2936 insertions(+), 3104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5748d27..f3ddf96 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -25,6 +25,8 @@ Trunk (Unreleased)
(Venkatesh Seetharam)
IMPROVEMENTS
+ FALCON-356 Merge OozieProcessMapper and OozieProcessWorkflowBuilder. (Shwetha GS)
+
FALCON-355 Remove SLAMonitoringService. (Shwetha GS)
FALCON-333 jsp-api dependency is defined twice. (Jean-Baptiste
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
index 4a00fa9..80022e0 100644
--- a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
+++ b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.falcon.util;
import org.apache.falcon.FalconException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
/**
@@ -30,12 +31,11 @@ public final class ReflectionUtils {
private ReflectionUtils() {}
public static <T> T getInstance(String classKey) throws FalconException {
- String clazzName = StartupProperties.get().getProperty(classKey);
- try {
- return ReflectionUtils.<T>getInstanceByClassName(clazzName);
- } catch (FalconException e) {
- throw new FalconException("Unable to get instance for key: " + classKey, e);
- }
+ return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey));
+ }
+
+ public static <T> T getInstance(String classKey, Class<?> argCls, Object arg) throws FalconException {
+ return ReflectionUtils.<T>getInstanceByClassName(StartupProperties.get().getProperty(classKey), argCls, arg);
}
@SuppressWarnings("unchecked")
@@ -52,4 +52,25 @@ public final class ReflectionUtils {
throw new FalconException("Unable to get instance for " + clazzName, e);
}
}
+
+ /**
+ * Invokes constructor with one argument.
+ * @param clazzName - classname
+ * @param argCls - Class of the argument
+ * @param arg - constructor argument
+ * @param <T> - instance type
+ * @return Class instance
+ * @throws FalconException
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T getInstanceByClassName(String clazzName, Class<?> argCls, Object arg) throws
+ FalconException {
+ try {
+ Class<T> clazz = (Class<T>) ReflectionUtils.class.getClassLoader().loadClass(clazzName);
+ Constructor<T> constructor = clazz.getConstructor(argCls);
+ return constructor.newInstance(arg);
+ } catch (Exception e) {
+ throw new FalconException("Unable to get instance for " + clazzName, e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
index 26243e7..1f9a8c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
@@ -22,8 +22,6 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.util.ReflectionUtils;
-import java.util.Date;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -32,16 +30,22 @@ import java.util.Properties;
* @param <T>
*/
public abstract class WorkflowBuilder<T extends Entity> {
+ protected T entity;
+
+ protected WorkflowBuilder(T entity) {
+ this.entity = entity;
+ }
+
+ public T getEntity() {
+ return entity;
+ }
public static WorkflowBuilder<Entity> getBuilder(String engine, Entity entity) throws FalconException {
String classKey = engine + "." + entity.getEntityType().name().toLowerCase() + ".workflow.builder";
- return ReflectionUtils.getInstance(classKey);
+ return ReflectionUtils.getInstance(classKey, entity.getEntityType().getEntityClass(), entity);
}
- public abstract Map<String, Properties> newWorkflowSchedule(T entity, List<String> clusters) throws FalconException;
-
- public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user)
- throws FalconException;
+ public abstract Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException;
- public abstract String[] getWorkflowNames(T entity);
+ public abstract String[] getWorkflowNames();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
new file mode 100644
index 0000000..bc0bce0
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/util/ReflectionUtilsTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.util;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ClusterEntityParser;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Tests ReflectionUtils.
+ */
+@Test
+public class ReflectionUtilsTest {
+ public void testGetInstance() throws FalconException {
+ //with 1 arg constructor, arg null
+ Object e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class, null);
+ Assert.assertTrue(e instanceof FalconException);
+
+ //with 1 arg constructor, arg not null
+ e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.FalconException", Throwable.class,
+ new Throwable());
+ Assert.assertTrue(e instanceof FalconException);
+
+ //no constructor, using get() method
+ e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.util.StartupProperties");
+ Assert.assertTrue(e instanceof StartupProperties);
+
+ //with empty constructor
+ e = ReflectionUtils.getInstanceByClassName("org.apache.falcon.entity.parser.ClusterEntityParser");
+ Assert.assertTrue(e instanceof ClusterEntityParser);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java b/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
deleted file mode 100644
index 2b3315f..0000000
--- a/feed/src/main/java/org/apache/falcon/converter/OozieFeedMapper.java
+++ /dev/null
@@ -1,596 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.ClusterType;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Property;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
-import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
-import org.apache.falcon.oozie.coordinator.ACTION;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.coordinator.WORKFLOW;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.util.BuildProperties;
-import org.apache.falcon.util.RuntimeProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.*;
-
-/**
- * Mapper which maps feed definition to oozie workflow definitions for
- * replication & retention.
- */
-public class OozieFeedMapper extends AbstractOozieEntityMapper<Feed> {
-
- private static final Logger LOG = Logger.getLogger(OozieFeedMapper.class);
-
- private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
- private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
-
- public OozieFeedMapper(Feed feed) {
- super(feed);
- }
-
- @Override
- protected List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
- List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
- COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
- if (retentionCoord != null) {
- coords.add(retentionCoord);
- }
- List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
- coords.addAll(replicationCoords);
- return coords;
- }
-
- private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
-
- Feed feed = getEntity();
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
-
- if (feedCluster.getValidity().getEnd().before(new Date())) {
- LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
- + " is not in the future");
- return null;
- }
-
- return retentionMapper.getRetentionCoordinator(cluster, bundlePath, feed, feedCluster);
- }
-
- private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
- throws FalconException {
-
- Feed feed = getEntity();
- List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
-
- if (FeedHelper.getCluster(feed, targetCluster.getName()).getType() == ClusterType.TARGET) {
- String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, feed).toString();
- Path basePath = getCoordPath(bundlePath, coordName);
- replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
-
- for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) {
- if (feedCluster.getType() == ClusterType.SOURCE) {
- COORDINATORAPP coord = replicationMapper.createAndGetCoord(feed,
- (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
- targetCluster, bundlePath);
-
- if (coord != null) {
- replicationCoords.add(coord);
- }
- }
- }
- }
-
- return replicationCoords;
- }
-
- @Override
- protected Map<String, String> getEntityProperties() {
- Feed feed = getEntity();
- Map<String, String> props = new HashMap<String, String>();
- if (feed.getProperties() != null) {
- for (Property prop : feed.getProperties().getProperties()) {
- props.put(prop.getName(), prop.getValue());
- }
- }
- return props;
- }
-
- private final class RetentionOozieWorkflowMapper {
-
- private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
-
- private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
- org.apache.falcon.entity.v0.feed.Cluster feedCluster)
- throws FalconException {
-
- COORDINATORAPP retentionApp = new COORDINATORAPP();
- String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
- retentionApp.setName(coordName);
- retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
- retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
- retentionApp.setTimezone(feed.getTimezone().getID());
- TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
- if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
- retentionApp.setFrequency("${coord:hours(6)}");
- } else {
- retentionApp.setFrequency("${coord:days(1)}");
- }
-
- Path wfPath = getCoordPath(bundlePath, coordName);
- retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
- return retentionApp;
- }
-
- private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
- throws FalconException {
- Feed feed = getEntity();
- ACTION retentionAction = new ACTION();
- WORKFLOW retentionWorkflow = new WORKFLOW();
- createRetentionWorkflow(cluster, wfPath, wfName);
- retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
-
- Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
- props.put("timeZone", feed.getTimezone().getID());
- props.put("frequency", feed.getFrequency().getTimeUnit().name());
-
- final Storage storage = FeedHelper.createStorage(cluster, feed);
- props.put("falconFeedStorageType", storage.getType().name());
-
- String feedDataPath = storage.getUriTemplate();
- props.put("feedDataPath",
- feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
-
- org.apache.falcon.entity.v0.feed.Cluster feedCluster =
- FeedHelper.getCluster(feed, cluster.getName());
- props.put("limit", feedCluster.getRetention().getLimit().toString());
-
- props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
- props.put(ARG.feedNames.getPropName(), feed.getName());
- props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
-
- props.put("falconInputFeeds", feed.getName());
- props.put("falconInPaths", "IGNORE");
-
- propagateUserWorkflowProperties(props, "eviction");
-
- retentionWorkflow.setConfiguration(getCoordConfig(props));
- retentionAction.setWorkflow(retentionWorkflow);
-
- return retentionAction;
- }
-
- private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
- try {
- WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
- retWfApp.setName(wfName);
- addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
- addOozieRetries(retWfApp);
- marshal(cluster, retWfApp, wfPath);
- } catch(IOException e) {
- throw new FalconException("Unable to create retention workflow", e);
- }
- }
- }
-
- private class ReplicationOozieWorkflowMapper {
- private static final String MR_MAX_MAPS = "maxMaps";
-
- private static final int THIRTY_MINUTES = 30 * 60 * 1000;
-
- private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
- private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
-
- private static final String TIMEOUT = "timeout";
- private static final String PARALLEL = "parallel";
-
- private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
- throws FalconException {
- try {
- WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
- repWFapp.setName(wfName);
- addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
- addOozieRetries(repWFapp);
- marshal(cluster, repWFapp, wfPath);
- } catch(IOException e) {
- throw new FalconException("Unable to create replication workflow", e);
- }
-
- }
-
- private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
- Path bundlePath) throws FalconException {
- long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
- Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
- Date sourceEndDate = getEndDate(feed, srcCluster);
-
- Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
- Date targetEndDate = getEndDate(feed, trgCluster);
-
- if (noOverlapExists(sourceStartDate, sourceEndDate,
- targetStartDate, targetEndDate)) {
- LOG.warn("Not creating replication coordinator, as the source cluster:"
- + srcCluster.getName()
- + " and target cluster: "
- + trgCluster.getName()
- + " do not have overlapping dates");
- return null;
- }
-
- COORDINATORAPP replicationCoord;
- try {
- replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
- } catch (FalconException e) {
- throw new FalconException("Cannot unmarshall replication coordinator template", e);
- }
-
- String coordName = EntityUtil.getWorkflowName(
- Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
- String start = sourceStartDate.after(targetStartDate)
- ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
- String end = sourceEndDate.before(targetEndDate)
- ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
-
- initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
- setCoordControls(feed, replicationCoord);
-
- final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
- initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
-
- final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
- initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
-
- Path wfPath = getCoordPath(bundlePath, coordName);
- ACTION replicationWorkflowAction = getReplicationWorkflowAction(
- srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
- replicationCoord.setAction(replicationWorkflowAction);
-
- return replicationCoord;
- }
-
- private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
- Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
- return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
- }
-
- private Date getEndDate(Feed feed, Cluster cluster) {
- return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
- }
-
- private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
- Date targetStartDate, Date targetEndDate) {
- return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
- }
-
- private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
- Feed feed, String start, String end, long delayInMillis) {
- replicationCoord.setName(coordName);
- replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
- if (delayInMillis > 0) {
- long delayInMins = -1 * delayInMillis / (1000 * 60);
- String elExp = "${now(0," + delayInMins + ")}";
-
- replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
- replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
- }
-
- replicationCoord.setStart(start);
- replicationCoord.setEnd(end);
- replicationCoord.setTimezone(feed.getTimezone().getID());
- }
-
- private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
- Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
- long delayInMillis=0;
- if (replicationDelay != null) {
- delayInMillis = ExpressionHelper.get().evaluate(
- replicationDelay.toString(), Long.class);
- }
-
- return delayInMillis;
- }
-
- private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
- long frequencyInMillis = ExpressionHelper.get().evaluate(
- feed.getFrequency().toString(), Long.class);
- long timeoutInMillis = frequencyInMillis * 6;
- if (timeoutInMillis < THIRTY_MINUTES) {
- timeoutInMillis = THIRTY_MINUTES;
- }
-
- Map<String, String> props = getEntityProperties();
- String timeout = props.get(TIMEOUT);
- if (timeout!=null) {
- try{
- timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
- } catch (Exception ignore) {
- LOG.error("Unable to evaluate timeout:", ignore);
- }
- }
- replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
- replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
-
- String parallelProp = props.get(PARALLEL);
- int parallel = 1;
- if (parallelProp != null) {
- try {
- parallel = Integer.parseInt(parallelProp);
- } catch (NumberFormatException ignore) {
- LOG.error("Unable to parse parallel:", ignore);
- }
- }
- replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
- }
-
- private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
- Storage sourceStorage) throws FalconException {
- SYNCDATASET inputDataset = (SYNCDATASET)
- replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
- String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
- if (sourceStorage.getType() == Storage.TYPE.TABLE) {
- uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
- }
- inputDataset.setUriTemplate(uriTemplate);
-
- setDatasetValues(inputDataset, feed, srcCluster);
-
- if (feed.getAvailabilityFlag() == null) {
- inputDataset.setDoneFlag("");
- } else {
- inputDataset.setDoneFlag(feed.getAvailabilityFlag());
- }
- }
-
- private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
- Storage targetStorage) throws FalconException {
- SYNCDATASET outputDataset = (SYNCDATASET)
- replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
-
- String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
- if (targetStorage.getType() == Storage.TYPE.TABLE) {
- uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
- }
- outputDataset.setUriTemplate(uriTemplate);
-
- setDatasetValues(outputDataset, feed, targetCluster);
- }
-
- private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
- dataset.setInitialInstance(SchemaHelper.formatDateUTC(
- FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
- dataset.setTimezone(feed.getTimezone().getID());
- dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
- }
-
- private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
- String wfName, Storage sourceStorage,
- Storage targetStorage) throws FalconException {
- ACTION replicationAction = new ACTION();
- WORKFLOW replicationWF = new WORKFLOW();
- try {
- replicationWF.setAppPath(getStoragePath(wfPath.toString()));
- Feed feed = getEntity();
-
- Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
- props.put("srcClusterName", srcCluster.getName());
- props.put("srcClusterColo", srcCluster.getColo());
- if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
- props.put(MR_MAX_MAPS, getDefaultMaxMaps());
- }
-
- // the storage type is uniform across source and target feeds for replication
- props.put("falconFeedStorageType", sourceStorage.getType().name());
-
- String instancePaths = null;
- if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
- String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
- instancePaths = pathsWithPartitions;
-
- propagateFileSystemCopyProperties(pathsWithPartitions, props);
- } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
- instancePaths = "${coord:dataIn('input')}";
-
- final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
- propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
- final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
- propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
- propagateTableCopyProperties(srcCluster, sourceTableStorage,
- trgCluster, targetTableStorage, props);
- setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
- }
-
- propagateLateDataProperties(feed, instancePaths, sourceStorage.getType().name(), props);
- propagateUserWorkflowProperties(props, "replication");
-
- replicationWF.setConfiguration(getCoordConfig(props));
- replicationAction.setWorkflow(replicationWF);
-
- } catch (Exception e) {
- throw new FalconException("Unable to create replication workflow", e);
- }
-
- return replicationAction;
- }
-
- private String getDefaultMaxMaps() {
- return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
- }
-
- private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
- Feed feed) throws FalconException {
- String srcPart = FeedHelper.normalizePartitionExpression(
- FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
- srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
-
- String targetPart = FeedHelper.normalizePartitionExpression(
- FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
- targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
-
- StringBuilder pathsWithPartitions = new StringBuilder();
- pathsWithPartitions.append("${coord:dataIn('input')}/")
- .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
-
- String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
- parts = StringUtils.stripEnd(parts, "/");
- return parts;
- }
-
- private void propagateFileSystemCopyProperties(String pathsWithPartitions,
- Map<String, String> props) throws FalconException {
- props.put("sourceRelativePaths", pathsWithPartitions);
-
- props.put("distcpSourcePaths", "${coord:dataIn('input')}");
- props.put("distcpTargetPaths", "${coord:dataOut('output')}");
- }
-
- private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
- Map<String, String> props, String prefix) {
- props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
- props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
- props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
-
- props.put(prefix + "Database", tableStorage.getDatabase());
- props.put(prefix + "Table", tableStorage.getTable());
- props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
- }
-
- private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
- Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
- throws IOException, FalconException {
- Configuration conf = ClusterHelper.getConfiguration(trgCluster);
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
-
- // copy import export scripts to stagingDir
- Path scriptPath = new Path(wfPath, "scripts");
- copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
- copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
-
- // create hive conf to stagingDir
- Path confPath = new Path(wfPath + "/conf");
- createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
- createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
- }
-
- private void copyHiveScript(FileSystem fs, Path scriptPath,
- String localScriptPath, String scriptName) throws IOException {
- OutputStream out = null;
- InputStream in = null;
- try {
- out = fs.create(new Path(scriptPath, scriptName));
- in = OozieFeedMapper.class.getResourceAsStream(localScriptPath + scriptName);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
- }
-
- private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
- Cluster trgCluster, CatalogStorage targetStorage,
- Map<String, String> props) {
- // create staging dirs for export at source & set it as distcpSourcePaths
- String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
- String sourceStagingDir =
- FeedHelper.getStagingDir(srcCluster, getEntity(), sourceStorage, Tag.REPLICATION)
- + "/" + sourceDatedPartitionKey
- + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
- props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
- // create staging dirs for import at target & set it as distcpTargetPaths
- String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
- String targetStagingDir =
- FeedHelper.getStagingDir(trgCluster, getEntity(), targetStorage, Tag.REPLICATION)
- + "/" + targetDatedPartitionKey
- + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
- props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
-
- props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
- }
-
- private void propagateLateDataProperties(Feed feed, String instancePaths,
- String falconFeedStorageType, Map<String, String> props) {
- // todo these pairs are the same but used in different context
- // late data handler - should-record action
- props.put("falconInputFeeds", feed.getName());
- props.put("falconInPaths", instancePaths);
-
- // storage type for each corresponding feed - in this case only one feed is involved
- // needed to compute usage based on storage type in LateDataHandler
- props.put("falconInputFeedStorageTypes", falconFeedStorageType);
-
- // falcon post processing
- props.put(ARG.feedNames.getPropName(), feed.getName());
- props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
- }
- }
-
- private void addOozieRetries(WORKFLOWAPP workflow) {
- for (Object object : workflow.getDecisionOrForkOrJoin()) {
- if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
- continue;
- }
- org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
- String actionName = action.getName();
- if (FALCON_ACTIONS.contains(actionName)) {
- decorateWithOozieRetries(action);
- }
- }
- }
-
- private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
- props.put("userWorkflowName", policy + "-policy");
- props.put("userWorkflowEngine", "falcon");
-
- String version;
- try {
- version = BuildProperties.get().getProperty("build.version");
- } catch (Exception e) { // unfortunate that this is only available in prism/webapp
- version = "0.5";
- }
- props.put("userWorkflowVersion", version);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index 5e3a30e..2008c2d 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -18,60 +18,83 @@
package org.apache.falcon.workflow;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
-import org.apache.falcon.converter.AbstractOozieEntityMapper;
-import org.apache.falcon.converter.OozieFeedMapper;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.messaging.EntityInstanceMessage.EntityOps;
+import org.apache.falcon.oozie.coordinator.ACTION;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.BuildProperties;
+import org.apache.falcon.util.RuntimeProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
-import java.util.*;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
/**
* Workflow definition builder for feed replication & retention.
*/
public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
+ private static final Logger LOG = Logger.getLogger(OozieFeedWorkflowBuilder.class);
+
+ public OozieFeedWorkflowBuilder(Feed entity) {
+ super(entity);
+ }
@Override
- public Map<String, Properties> newWorkflowSchedule(Feed feed, List<String> clusters) throws FalconException {
+ public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
for (String clusterName : clusters) {
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
- Properties properties = newWorkflowSchedule(feed, feedCluster.getValidity().getStart(), clusterName,
- CurrentUser.getUser());
- if (properties == null) {
- continue;
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, clusterName);
+ if (!feedCluster.getValidity().getStart().before(feedCluster.getValidity().getEnd())) {
+ LOG.info("feed validity start <= end for cluster " + clusterName + ". Skipping schedule");
+ break;
}
- propertiesMap.put(clusterName, properties);
- }
- return propertiesMap;
- }
- @Override
- public Properties newWorkflowSchedule(Feed feed, Date startDate, String clusterName, String user)
- throws FalconException {
-
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, clusterName);
- if (!startDate.before(feedCluster.getValidity().getEnd())) {
- return null;
- }
+ Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
+ Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
- Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, feedCluster.getName());
- Path bundlePath = EntityUtil.getNewStagingPath(cluster, feed);
- Feed feedClone = (Feed) feed.copy();
- EntityUtil.setStartDate(feedClone, clusterName, startDate);
-
- AbstractOozieEntityMapper<Feed> mapper = new OozieFeedMapper(feedClone);
- if (!mapper.map(cluster, bundlePath)) {
- return null;
+ if (!map(cluster, bundlePath)) {
+ break;
+ }
+ propertiesMap.put(clusterName, createAppProperties(clusterName, bundlePath, CurrentUser.getUser()));
}
- return createAppProperties(clusterName, bundlePath, user);
+ return propertiesMap;
}
@Override
@@ -82,9 +105,518 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
}
@Override
- public String[] getWorkflowNames(Feed entity) {
+ public String[] getWorkflowNames() {
return new String[]{
EntityUtil.getWorkflowName(Tag.RETENTION, entity).toString(),
EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString(), };
}
+
+ private final RetentionOozieWorkflowMapper retentionMapper = new RetentionOozieWorkflowMapper();
+ private final ReplicationOozieWorkflowMapper replicationMapper = new ReplicationOozieWorkflowMapper();
+
+ @Override
+ public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+ List<COORDINATORAPP> coords = new ArrayList<COORDINATORAPP>();
+ COORDINATORAPP retentionCoord = getRetentionCoordinator(cluster, bundlePath);
+ if (retentionCoord != null) {
+ coords.add(retentionCoord);
+ }
+ List<COORDINATORAPP> replicationCoords = getReplicationCoordinators(cluster, bundlePath);
+ coords.addAll(replicationCoords);
+ return coords;
+ }
+
+ private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+
+ if (feedCluster.getValidity().getEnd().before(new Date())) {
+ LOG.warn("Feed Retention is not applicable as Feed's end time for cluster " + cluster.getName()
+ + " is not in the future");
+ return null;
+ }
+
+ return retentionMapper.getRetentionCoordinator(cluster, bundlePath, entity, feedCluster);
+ }
+
+ private List<COORDINATORAPP> getReplicationCoordinators(Cluster targetCluster, Path bundlePath)
+ throws FalconException {
+ List<COORDINATORAPP> replicationCoords = new ArrayList<COORDINATORAPP>();
+
+ if (FeedHelper.getCluster(entity, targetCluster.getName()).getType() == ClusterType.TARGET) {
+ String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
+ Path basePath = getCoordPath(bundlePath, coordName);
+ replicationMapper.createReplicatonWorkflow(targetCluster, basePath, coordName);
+
+ for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
+ if (feedCluster.getType() == ClusterType.SOURCE) {
+ COORDINATORAPP coord = replicationMapper.createAndGetCoord(entity,
+ (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName()),
+ targetCluster, bundlePath);
+
+ if (coord != null) {
+ replicationCoords.add(coord);
+ }
+ }
+ }
+ }
+
+ return replicationCoords;
+ }
+
+ @Override
+ protected Map<String, String> getEntityProperties() {
+ Map<String, String> props = new HashMap<String, String>();
+ if (entity.getProperties() != null) {
+ for (Property prop : entity.getProperties().getProperties()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ }
+ return props;
+ }
+
+ private final class RetentionOozieWorkflowMapper {
+
+ private static final String RETENTION_WF_TEMPLATE = "/config/workflow/retention-workflow.xml";
+
+ private COORDINATORAPP getRetentionCoordinator(Cluster cluster, Path bundlePath, Feed feed,
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster) throws FalconException {
+ COORDINATORAPP retentionApp = new COORDINATORAPP();
+ String coordName = EntityUtil.getWorkflowName(Tag.RETENTION, feed).toString();
+ retentionApp.setName(coordName);
+ retentionApp.setEnd(SchemaHelper.formatDateUTC(feedCluster.getValidity().getEnd()));
+ retentionApp.setStart(SchemaHelper.formatDateUTC(new Date()));
+ retentionApp.setTimezone(feed.getTimezone().getID());
+ TimeUnit timeUnit = feed.getFrequency().getTimeUnit();
+ if (timeUnit == TimeUnit.hours || timeUnit == TimeUnit.minutes) {
+ retentionApp.setFrequency("${coord:hours(6)}");
+ } else {
+ retentionApp.setFrequency("${coord:days(1)}");
+ }
+
+ Path wfPath = getCoordPath(bundlePath, coordName);
+ retentionApp.setAction(getRetentionWorkflowAction(cluster, wfPath, coordName));
+ return retentionApp;
+ }
+
+ private ACTION getRetentionWorkflowAction(Cluster cluster, Path wfPath, String wfName)
+ throws FalconException {
+ ACTION retentionAction = new ACTION();
+ WORKFLOW retentionWorkflow = new WORKFLOW();
+ createRetentionWorkflow(cluster, wfPath, wfName);
+ retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
+
+ Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+ props.put("timeZone", entity.getTimezone().getID());
+ props.put("frequency", entity.getFrequency().getTimeUnit().name());
+
+ final Storage storage = FeedHelper.createStorage(cluster, entity);
+ props.put("falconFeedStorageType", storage.getType().name());
+
+ String feedDataPath = storage.getUriTemplate();
+ props.put("feedDataPath",
+ feedDataPath.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
+ props.put("limit", feedCluster.getRetention().getLimit().toString());
+
+ props.put(ARG.operation.getPropName(), EntityOps.DELETE.name());
+ props.put(ARG.feedNames.getPropName(), entity.getName());
+ props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+
+ props.put("falconInputFeeds", entity.getName());
+ props.put("falconInPaths", "IGNORE");
+
+ propagateUserWorkflowProperties(props, "eviction");
+
+ retentionWorkflow.setConfiguration(getCoordConfig(props));
+ retentionAction.setWorkflow(retentionWorkflow);
+ return retentionAction;
+ }
+
+ private void createRetentionWorkflow(Cluster cluster, Path wfPath, String wfName) throws FalconException {
+ try {
+ WORKFLOWAPP retWfApp = getWorkflowTemplate(RETENTION_WF_TEMPLATE);
+ retWfApp.setName(wfName);
+ addLibExtensionsToWorkflow(cluster, retWfApp, EntityType.FEED, "retention");
+ addOozieRetries(retWfApp);
+ marshal(cluster, retWfApp, wfPath);
+ } catch(IOException e) {
+ throw new FalconException("Unable to create retention workflow", e);
+ }
+ }
+ }
+
+ private class ReplicationOozieWorkflowMapper {
+ private static final String MR_MAX_MAPS = "maxMaps";
+
+ private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+ private static final String REPLICATION_COORD_TEMPLATE = "/config/coordinator/replication-coordinator.xml";
+ private static final String REPLICATION_WF_TEMPLATE = "/config/workflow/replication-workflow.xml";
+
+ private static final String TIMEOUT = "timeout";
+ private static final String PARALLEL = "parallel";
+
+ private void createReplicatonWorkflow(Cluster cluster, Path wfPath, String wfName)
+ throws FalconException {
+ try {
+ WORKFLOWAPP repWFapp = getWorkflowTemplate(REPLICATION_WF_TEMPLATE);
+ repWFapp.setName(wfName);
+ addLibExtensionsToWorkflow(cluster, repWFapp, EntityType.FEED, "replication");
+ addOozieRetries(repWFapp);
+ marshal(cluster, repWFapp, wfPath);
+ } catch(IOException e) {
+ throw new FalconException("Unable to create replication workflow", e);
+ }
+
+ }
+
+ private COORDINATORAPP createAndGetCoord(Feed feed, Cluster srcCluster, Cluster trgCluster,
+ Path bundlePath) throws FalconException {
+ long replicationDelayInMillis = getReplicationDelayInMillis(feed, srcCluster);
+ Date sourceStartDate = getStartDate(feed, srcCluster, replicationDelayInMillis);
+ Date sourceEndDate = getEndDate(feed, srcCluster);
+
+ Date targetStartDate = getStartDate(feed, trgCluster, replicationDelayInMillis);
+ Date targetEndDate = getEndDate(feed, trgCluster);
+
+ if (noOverlapExists(sourceStartDate, sourceEndDate,
+ targetStartDate, targetEndDate)) {
+ LOG.warn("Not creating replication coordinator, as the source cluster:" + srcCluster.getName()
+ + "and target cluster: " + trgCluster.getName() + " do not have overlapping dates");
+ return null;
+ }
+
+ COORDINATORAPP replicationCoord;
+ try {
+ replicationCoord = getCoordinatorTemplate(REPLICATION_COORD_TEMPLATE);
+ } catch (FalconException e) {
+ throw new FalconException("Cannot unmarshall replication coordinator template", e);
+ }
+
+ String coordName = EntityUtil.getWorkflowName(
+ Tag.REPLICATION, Arrays.asList(srcCluster.getName()), feed).toString();
+ String start = sourceStartDate.after(targetStartDate)
+ ? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
+ String end = sourceEndDate.before(targetEndDate)
+ ? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
+
+ initializeCoordAttributes(replicationCoord, coordName, feed, start, end, replicationDelayInMillis);
+ setCoordControls(feed, replicationCoord);
+
+ final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, feed);
+ initializeInputDataSet(feed, srcCluster, replicationCoord, sourceStorage);
+
+ final Storage targetStorage = FeedHelper.createStorage(trgCluster, feed);
+ initializeOutputDataSet(feed, trgCluster, replicationCoord, targetStorage);
+
+ Path wfPath = getCoordPath(bundlePath, coordName);
+ ACTION replicationWorkflowAction = getReplicationWorkflowAction(
+ srcCluster, trgCluster, wfPath, coordName, sourceStorage, targetStorage);
+ replicationCoord.setAction(replicationWorkflowAction);
+
+ return replicationCoord;
+ }
+
+ private Date getStartDate(Feed feed, Cluster cluster, long replicationDelayInMillis) {
+ Date startDate = FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart();
+ return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
+ }
+
+ private Date getEndDate(Feed feed, Cluster cluster) {
+ return FeedHelper.getCluster(feed, cluster.getName()).getValidity().getEnd();
+ }
+
+ private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
+ Date targetStartDate, Date targetEndDate) {
+ return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
+ }
+
+ private void initializeCoordAttributes(COORDINATORAPP replicationCoord, String coordName,
+ Feed feed, String start, String end, long delayInMillis) {
+ replicationCoord.setName(coordName);
+ replicationCoord.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+ if (delayInMillis > 0) {
+ long delayInMins = -1 * delayInMillis / (1000 * 60);
+ String elExp = "${now(0," + delayInMins + ")}";
+
+ replicationCoord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
+ replicationCoord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
+ }
+
+ replicationCoord.setStart(start);
+ replicationCoord.setEnd(end);
+ replicationCoord.setTimezone(feed.getTimezone().getID());
+ }
+
+ private long getReplicationDelayInMillis(Feed feed, Cluster srcCluster) throws FalconException {
+ Frequency replicationDelay = FeedHelper.getCluster(feed, srcCluster.getName()).getDelay();
+ long delayInMillis=0;
+ if (replicationDelay != null) {
+ delayInMillis = ExpressionHelper.get().evaluate(
+ replicationDelay.toString(), Long.class);
+ }
+
+ return delayInMillis;
+ }
+
+ private void setCoordControls(Feed feed, COORDINATORAPP replicationCoord) throws FalconException {
+ long frequencyInMillis = ExpressionHelper.get().evaluate(
+ feed.getFrequency().toString(), Long.class);
+ long timeoutInMillis = frequencyInMillis * 6;
+ if (timeoutInMillis < THIRTY_MINUTES) {
+ timeoutInMillis = THIRTY_MINUTES;
+ }
+
+ Map<String, String> props = getEntityProperties();
+ String timeout = props.get(TIMEOUT);
+ if (timeout!=null) {
+ try{
+ timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
+ } catch (Exception ignore) {
+ LOG.error("Unable to evaluate timeout:", ignore);
+ }
+ }
+ replicationCoord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+ replicationCoord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+
+ String parallelProp = props.get(PARALLEL);
+ int parallel = 1;
+ if (parallelProp != null) {
+ try {
+ parallel = Integer.parseInt(parallelProp);
+ } catch (NumberFormatException ignore) {
+ LOG.error("Unable to parse parallel:", ignore);
+ }
+ }
+ replicationCoord.getControls().setConcurrency(String.valueOf(parallel));
+ }
+
+ private void initializeInputDataSet(Feed feed, Cluster srcCluster, COORDINATORAPP replicationCoord,
+ Storage sourceStorage) throws FalconException {
+ SYNCDATASET inputDataset = (SYNCDATASET)
+ replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(0);
+
+ String uriTemplate = sourceStorage.getUriTemplate(LocationType.DATA);
+ if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ inputDataset.setUriTemplate(uriTemplate);
+
+ setDatasetValues(inputDataset, feed, srcCluster);
+
+ if (feed.getAvailabilityFlag() == null) {
+ inputDataset.setDoneFlag("");
+ } else {
+ inputDataset.setDoneFlag(feed.getAvailabilityFlag());
+ }
+ }
+
+ private void initializeOutputDataSet(Feed feed, Cluster targetCluster, COORDINATORAPP replicationCoord,
+ Storage targetStorage) throws FalconException {
+ SYNCDATASET outputDataset = (SYNCDATASET)
+ replicationCoord.getDatasets().getDatasetOrAsyncDataset().get(1);
+
+ String uriTemplate = targetStorage.getUriTemplate(LocationType.DATA);
+ if (targetStorage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ outputDataset.setUriTemplate(uriTemplate);
+
+ setDatasetValues(outputDataset, feed, targetCluster);
+ }
+
+ private void setDatasetValues(SYNCDATASET dataset, Feed feed, Cluster cluster) {
+ dataset.setInitialInstance(SchemaHelper.formatDateUTC(
+ FeedHelper.getCluster(feed, cluster.getName()).getValidity().getStart()));
+ dataset.setTimezone(feed.getTimezone().getID());
+ dataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+ }
+
+ private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path wfPath,
+ String wfName, Storage sourceStorage,
+ Storage targetStorage) throws FalconException {
+ ACTION replicationAction = new ACTION();
+ WORKFLOW replicationWF = new WORKFLOW();
+ try {
+ replicationWF.setAppPath(getStoragePath(wfPath.toString()));
+ Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+ props.put("srcClusterName", srcCluster.getName());
+ props.put("srcClusterColo", srcCluster.getColo());
+ if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
+ props.put(MR_MAX_MAPS, getDefaultMaxMaps());
+ }
+
+ // the storage type is uniform across source and target feeds for replication
+ props.put("falconFeedStorageType", sourceStorage.getType().name());
+
+ String instancePaths = null;
+ if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
+ String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, entity);
+ instancePaths = pathsWithPartitions;
+
+ propagateFileSystemCopyProperties(pathsWithPartitions, props);
+ } else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
+ instancePaths = "${coord:dataIn('input')}";
+
+ final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
+ propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
+ final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
+ propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
+ propagateTableCopyProperties(srcCluster, sourceTableStorage,
+ trgCluster, targetTableStorage, props);
+ setupHiveConfiguration(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, wfPath);
+ }
+
+ propagateLateDataProperties(entity, instancePaths, sourceStorage.getType().name(), props);
+ propagateUserWorkflowProperties(props, "replication");
+
+ replicationWF.setConfiguration(getCoordConfig(props));
+ replicationAction.setWorkflow(replicationWF);
+
+ } catch (Exception e) {
+ throw new FalconException("Unable to create replication workflow", e);
+ }
+
+ return replicationAction;
+ }
+
+ private String getDefaultMaxMaps() {
+ return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
+ }
+
+ private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster,
+ Feed feed) throws FalconException {
+ String srcPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(feed, srcCluster.getName()).getPartition());
+ srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
+
+ String targetPart = FeedHelper.normalizePartitionExpression(
+ FeedHelper.getCluster(feed, trgCluster.getName()).getPartition());
+ targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
+
+ StringBuilder pathsWithPartitions = new StringBuilder();
+ pathsWithPartitions.append("${coord:dataIn('input')}/")
+ .append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
+
+ String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
+ parts = StringUtils.stripEnd(parts, "/");
+ return parts;
+ }
+
+ private void propagateFileSystemCopyProperties(String pathsWithPartitions,
+ Map<String, String> props) throws FalconException {
+ props.put("sourceRelativePaths", pathsWithPartitions);
+
+ props.put("distcpSourcePaths", "${coord:dataIn('input')}");
+ props.put("distcpTargetPaths", "${coord:dataOut('output')}");
+ }
+
+ private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
+ Map<String, String> props, String prefix) {
+ props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
+ props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
+ props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
+
+ props.put(prefix + "Database", tableStorage.getDatabase());
+ props.put(prefix + "Table", tableStorage.getTable());
+ props.put(prefix + "Partition", "${coord:dataInPartitionFilter('input', 'hive')}");
+ }
+
+ private void setupHiveConfiguration(Cluster srcCluster, CatalogStorage sourceStorage,
+ Cluster trgCluster, CatalogStorage targetStorage, Path wfPath)
+ throws IOException, FalconException {
+ Configuration conf = ClusterHelper.getConfiguration(trgCluster);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(conf);
+
+ // copy import export scripts to stagingDir
+ Path scriptPath = new Path(wfPath, "scripts");
+ copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-export.hql");
+ copyHiveScript(fs, scriptPath, "/config/workflow/", "falcon-table-import.hql");
+
+ // create hive conf to stagingDir
+ Path confPath = new Path(wfPath + "/conf");
+ createHiveConf(fs, confPath, sourceStorage.getCatalogUrl(), srcCluster, "falcon-source-");
+ createHiveConf(fs, confPath, targetStorage.getCatalogUrl(), trgCluster, "falcon-target-");
+ }
+
+ private void copyHiveScript(FileSystem fs, Path scriptPath,
+ String localScriptPath, String scriptName) throws IOException {
+ OutputStream out = null;
+ InputStream in = null;
+ try {
+ out = fs.create(new Path(scriptPath, scriptName));
+ in = OozieFeedWorkflowBuilder.class.getResourceAsStream(localScriptPath + scriptName);
+ IOUtils.copy(in, out);
+ } finally {
+ IOUtils.closeQuietly(in);
+ IOUtils.closeQuietly(out);
+ }
+ }
+
+ private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
+ Cluster trgCluster, CatalogStorage targetStorage,
+ Map<String, String> props) {
+ // create staging dirs for export at source & set it as distcpSourcePaths
+ String sourceDatedPartitionKey = sourceStorage.getDatedPartitionKey();
+ String sourceStagingDir =
+ FeedHelper.getStagingDir(srcCluster, entity, sourceStorage, Tag.REPLICATION)
+ + "/" + sourceDatedPartitionKey
+ + "=${coord:dataOutPartitionValue('output', '" + sourceDatedPartitionKey + "')}";
+ props.put("distcpSourcePaths", sourceStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+ // create staging dirs for import at target & set it as distcpTargetPaths
+ String targetDatedPartitionKey = targetStorage.getDatedPartitionKey();
+ String targetStagingDir =
+ FeedHelper.getStagingDir(trgCluster, entity, targetStorage, Tag.REPLICATION)
+ + "/" + targetDatedPartitionKey
+ + "=${coord:dataOutPartitionValue('output', '" + targetDatedPartitionKey + "')}";
+ props.put("distcpTargetPaths", targetStagingDir + "/" + NOMINAL_TIME_EL + "/data");
+
+ props.put("sourceRelativePaths", "IGNORE"); // this will bot be used for Table storage.
+ }
+
+ private void propagateLateDataProperties(Feed feed, String instancePaths,
+ String falconFeedStorageType, Map<String, String> props) {
+ // todo these pairs are the same but used in different context
+ // late data handler - should-record action
+ props.put("falconInputFeeds", feed.getName());
+ props.put("falconInPaths", instancePaths);
+
+ // storage type for each corresponding feed - in this case only one feed is involved
+ // needed to compute usage based on storage type in LateDataHandler
+ props.put("falconInputFeedStorageTypes", falconFeedStorageType);
+
+ // falcon post processing
+ props.put(ARG.feedNames.getPropName(), feed.getName());
+ props.put(ARG.feedInstancePaths.getPropName(), "${coord:dataOut('output')}");
+ }
+ }
+
+ private void addOozieRetries(WORKFLOWAPP workflow) {
+ for (Object object : workflow.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof org.apache.falcon.oozie.workflow.ACTION)) {
+ continue;
+ }
+ org.apache.falcon.oozie.workflow.ACTION action = (org.apache.falcon.oozie.workflow.ACTION) object;
+ String actionName = action.getName();
+ if (FALCON_ACTIONS.contains(actionName)) {
+ decorateWithOozieRetries(action);
+ }
+ }
+ }
+
+ private void propagateUserWorkflowProperties(Map<String, String> props, String policy) {
+ props.put("userWorkflowName", policy + "-policy");
+ props.put("userWorkflowEngine", "falcon");
+
+ String version;
+ try {
+ version = BuildProperties.get().getProperty("build.version");
+ } catch (Exception e) { // unfortunate that this is only available in prism/webapp
+ version = "0.5";
+ }
+ props.put("userWorkflowVersion", version);
+ }
}
[2/5] FALCON-356 Merge OozieProcessMapper and
OozieProcessWorkflowBuilder. Contributed by Shwetha GS
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 4e5e8c6..c31842b 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -21,41 +21,106 @@ package org.apache.falcon.workflow;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
-import org.apache.falcon.converter.OozieProcessMapper;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.oozie.coordinator.CONTROLS;
+import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
+import org.apache.falcon.oozie.coordinator.DATAIN;
+import org.apache.falcon.oozie.coordinator.DATAOUT;
+import org.apache.falcon.oozie.coordinator.DATASETS;
+import org.apache.falcon.oozie.coordinator.INPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
+import org.apache.falcon.oozie.coordinator.SYNCDATASET;
+import org.apache.falcon.oozie.coordinator.WORKFLOW;
+import org.apache.falcon.oozie.workflow.ACTION;
+import org.apache.falcon.oozie.workflow.DELETE;
+import org.apache.falcon.oozie.workflow.PIG;
+import org.apache.falcon.oozie.workflow.PREPARE;
+import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.update.UpdateHelper;
+import org.apache.falcon.util.OozieUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.client.OozieClient;
-import java.util.*;
+import javax.xml.bind.JAXBElement;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
/**
* Oozie workflow builder for falcon entities.
*/
public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
+ private static final Logger LOG = Logger.getLogger(OozieProcessWorkflowBuilder.class);
+
+ public OozieProcessWorkflowBuilder(Process entity) {
+ super(entity);
+ }
@Override
- public Map<String, Properties> newWorkflowSchedule(Process process, List<String> clusters) throws FalconException {
+ public Map<String, Properties> newWorkflowSchedule(String... clusters) throws FalconException {
Map<String, Properties> propertiesMap = new HashMap<String, Properties>();
for (String clusterName : clusters) {
- org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
- Properties properties = newWorkflowSchedule(process, processCluster.getValidity().getStart(), clusterName,
- CurrentUser.getUser());
- if (properties != null) {
- propertiesMap.put(clusterName, properties);
+ org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity, clusterName);
+ if (processCluster.getValidity().getStart().compareTo(processCluster.getValidity().getEnd()) >= 0) {
+ LOG.info("process validity start <= end for cluster " + clusterName + ". Skipping schedule");
+ break;
+ }
+
+ Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
+ Path bundlePath = EntityUtil.getNewStagingPath(cluster, entity);
+ map(cluster, bundlePath);
+ Properties properties = createAppProperties(clusterName, bundlePath, CurrentUser.getUser());
+
+ //Add libpath
+ String libPath = entity.getWorkflow().getLib();
+ if (!StringUtils.isEmpty(libPath)) {
+ String path = libPath.replace("${nameNode}", "");
+ properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
+ }
+
+ if (entity.getInputs() != null) {
+ for (Input in : entity.getInputs().getInputs()) {
+ if (in.isOptional()) {
+ addOptionalInputProperties(properties, in, clusterName);
+ }
+ }
}
+ propertiesMap.put(clusterName, properties);
}
return propertiesMap;
}
@@ -100,51 +165,734 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
}
@Override
- public Properties newWorkflowSchedule(Process process, Date startDate, String clusterName, String user)
- throws FalconException {
- org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, clusterName);
- if (!startDate.before(processCluster.getValidity().getEnd())) {// start time >= end time
- return null;
+ public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
+ org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
+ return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
+ process.getFrequency(), process.getTimezone(), now);
+ }
+
+ @Override
+ public String[] getWorkflowNames() {
+ return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString()};
+ }
+
+ private static final String DEFAULT_WF_TEMPLATE = "/config/workflow/process-parent-workflow.xml";
+ private static final int THIRTY_MINUTES = 30 * 60 * 1000;
+
+ @Override
+ public List<COORDINATORAPP> getCoordinators(Cluster cluster, Path bundlePath) throws FalconException {
+ try {
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+
+ //Copy user workflow and lib to staging dir
+ Map<String, String> checksums = UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getPath()),
+ new Path(bundlePath, EntityUtil.PROCESS_USER_DIR));
+ if (entity.getWorkflow().getLib() != null && fs.exists(new Path(entity.getWorkflow().getLib()))) {
+ checksums.putAll(UpdateHelper.checksumAndCopy(fs, new Path(entity.getWorkflow().getLib()),
+ new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR)));
+ }
+
+ writeChecksums(fs, new Path(bundlePath, EntityUtil.PROCESS_CHECKSUM_FILE), checksums);
+ } catch (IOException e) {
+ throw new FalconException("Failed to copy user workflow/lib", e);
+ }
+
+ List<COORDINATORAPP> apps = new ArrayList<COORDINATORAPP>();
+ apps.add(createDefaultCoordinator(cluster, bundlePath));
+
+ return apps;
+ }
+
+ private void writeChecksums(FileSystem fs, Path path, Map<String, String> checksums) throws FalconException {
+ try {
+ FSDataOutputStream stream = fs.create(path);
+ try {
+ for (Map.Entry<String, String> entry : checksums.entrySet()) {
+ stream.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+ }
+ } finally {
+ stream.close();
+ }
+ } catch (IOException e) {
+ throw new FalconException("Failed to copy user workflow/lib", e);
}
+ }
+
+ private Path getUserWorkflowPath(Cluster cluster, Path bundlePath) throws FalconException {
+ try {
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+ Path wfPath = new Path(entity.getWorkflow().getPath());
+ if (fs.isFile(wfPath)) {
+ return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR + "/" + wfPath.getName());
+ } else {
+ return new Path(bundlePath, EntityUtil.PROCESS_USER_DIR);
+ }
+ } catch(IOException e) {
+ throw new FalconException("Failed to get workflow path", e);
+ }
+ }
+
+ private Path getUserLibPath(Cluster cluster, Path bundlePath) throws FalconException {
+ try {
+ if (entity.getWorkflow().getLib() == null) {
+ return null;
+ }
+ Path libPath = new Path(entity.getWorkflow().getLib());
- Cluster cluster = CONFIG_STORE.get(EntityType.CLUSTER, processCluster.getName());
- Path bundlePath = EntityUtil.getNewStagingPath(cluster, process);
- Process processClone = (Process) process.copy();
- EntityUtil.setStartDate(processClone, clusterName, startDate);
+ FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(ClusterHelper.getConfiguration(cluster));
+ if (fs.isFile(libPath)) {
+ return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR + "/" + libPath.getName());
+ } else {
+ return new Path(bundlePath, EntityUtil.PROCESS_USERLIB_DIR);
+ }
+ } catch(IOException e) {
+ throw new FalconException("Failed to get user lib path", e);
+ }
+ }
- OozieProcessMapper mapper = new OozieProcessMapper(processClone);
- if (!mapper.map(cluster, bundlePath)) {
+ /**
+ * Creates default oozie coordinator.
+ *
+ * @param cluster - Cluster for which the coordiantor app need to be created
+ * @param bundlePath - bundle path
+ * @return COORDINATORAPP
+ * @throws FalconException on Error
+ */
+ public COORDINATORAPP createDefaultCoordinator(Cluster cluster, Path bundlePath) throws FalconException {
+ if (entity == null) {
return null;
}
- Properties properties = createAppProperties(clusterName, bundlePath, user);
+ COORDINATORAPP coord = new COORDINATORAPP();
+ String coordName = EntityUtil.getWorkflowName(Tag.DEFAULT, entity).toString();
+ Path coordPath = getCoordPath(bundlePath, coordName);
+
+ // coord attributes
+ initializeCoordAttributes(cluster, entity, coord, coordName);
+
+ CONTROLS controls = initializeControls(entity); // controls
+ coord.setControls(controls);
+
+ // Configuration
+ Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
+
+ initializeInputPaths(cluster, entity, coord, props); // inputs
+ initializeOutputPaths(cluster, entity, coord, props); // outputs
+
+ Workflow processWorkflow = entity.getWorkflow();
+ propagateUserWorkflowProperties(processWorkflow, props, entity.getName());
+
+ // create parent wf
+ createWorkflow(cluster, entity, processWorkflow, coordName, coordPath);
+
+ WORKFLOW wf = new WORKFLOW();
+ wf.setAppPath(getStoragePath(coordPath.toString()));
+ wf.setConfiguration(getCoordConfig(props));
+
+ // set coord action to parent wf
+ org.apache.falcon.oozie.coordinator.ACTION action = new org.apache.falcon.oozie.coordinator.ACTION();
+ action.setWorkflow(wf);
+ coord.setAction(action);
+
+ return coord;
+ }
+
+ private void initializeCoordAttributes(Cluster cluster, Process process, COORDINATORAPP coord, String coordName) {
+ coord.setName(coordName);
+ org.apache.falcon.entity.v0.process.Cluster processCluster =
+ ProcessHelper.getCluster(process, cluster.getName());
+ coord.setStart(SchemaHelper.formatDateUTC(processCluster.getValidity().getStart()));
+ coord.setEnd(SchemaHelper.formatDateUTC(processCluster.getValidity().getEnd()));
+ coord.setTimezone(process.getTimezone().getID());
+ coord.setFrequency("${coord:" + process.getFrequency().toString() + "}");
+ }
+
+ private CONTROLS initializeControls(Process process)
+ throws FalconException {
+ CONTROLS controls = new CONTROLS();
+ controls.setConcurrency(String.valueOf(process.getParallel()));
+ controls.setExecution(process.getOrder().name());
+
+ Frequency timeout = process.getTimeout();
+ long frequencyInMillis = ExpressionHelper.get().evaluate(process.getFrequency().toString(), Long.class);
+ long timeoutInMillis;
+ if (timeout != null) {
+ timeoutInMillis = ExpressionHelper.get().
+ evaluate(process.getTimeout().toString(), Long.class);
+ } else {
+ timeoutInMillis = frequencyInMillis * 6;
+ if (timeoutInMillis < THIRTY_MINUTES) {
+ timeoutInMillis = THIRTY_MINUTES;
+ }
+ }
+ controls.setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
+
+ if (timeoutInMillis / frequencyInMillis * 2 > 0) {
+ controls.setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
+ }
+
+ return controls;
+ }
- //Add libpath
- String libPath = process.getWorkflow().getLib();
- if (!StringUtils.isEmpty(libPath)) {
- String path = libPath.replace("${nameNode}", "");
- properties.put(OozieClient.LIBPATH, "${nameNode}" + path);
+ private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+ Map<String, String> props) throws FalconException {
+ if (process.getInputs() == null) {
+ props.put("falconInputFeeds", "NONE");
+ props.put("falconInPaths", "IGNORE");
+ return;
}
- if (process.getInputs() != null) {
- for (Input in : process.getInputs().getInputs()) {
- if (in.isOptional()) {
- addOptionalInputProperties(properties, in, clusterName);
+ List<String> inputFeeds = new ArrayList<String>();
+ List<String> inputPaths = new ArrayList<String>();
+ List<String> inputFeedStorageTypes = new ArrayList<String>();
+ for (Input input : process.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ if (!input.isOptional()) {
+ if (coord.getDatasets() == null) {
+ coord.setDatasets(new DATASETS());
+ }
+ if (coord.getInputEvents() == null) {
+ coord.setInputEvents(new INPUTEVENTS());
}
+
+ SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
+ coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+ DATAIN datain = createDataIn(input);
+ coord.getInputEvents().getDataIn().add(datain);
}
+
+ String inputExpr = null;
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+ props.put(input.getName(), inputExpr);
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ inputExpr = "${coord:dataIn('" + input.getName() + "')}";
+ propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
+ }
+
+ inputFeeds.add(feed.getName());
+ inputPaths.add(inputExpr);
+ inputFeedStorageTypes.add(storage.getType().name());
}
- return properties;
+
+ propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
}
- @Override
- public Date getNextStartTime(Process process, String cluster, Date now) throws FalconException {
- org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(process, cluster);
- return EntityUtil.getNextStartTime(processCluster.getValidity().getStart(),
- process.getFrequency(), process.getTimezone(), now);
+ private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+ List<String> inputFeedStorageTypes, Map<String, String> props) {
+ // populate late data handler - should-record action
+ props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
+ props.put("falconInPaths", join(inputPaths.iterator(), '#'));
+
+ // storage type for each corresponding feed sent as a param to LateDataHandler
+ // needed to compute usage based on storage type in LateDataHandler
+ props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
+ }
+
+ private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
+ Map<String, String> props) throws FalconException {
+ if (process.getOutputs() == null) {
+ props.put(ARG.feedNames.getPropName(), "NONE");
+ props.put(ARG.feedInstancePaths.getPropName(), "IGNORE");
+ return;
+ }
+
+ if (coord.getDatasets() == null) {
+ coord.setDatasets(new DATASETS());
+ }
+
+ if (coord.getOutputEvents() == null) {
+ coord.setOutputEvents(new OUTPUTEVENTS());
+ }
+
+ List<String> outputFeeds = new ArrayList<String>();
+ List<String> outputPaths = new ArrayList<String>();
+ for (Output output : process.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
+ coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
+
+ DATAOUT dataout = createDataOut(output);
+ coord.getOutputEvents().getDataOut().add(dataout);
+
+ String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
+ outputFeeds.add(feed.getName());
+ outputPaths.add(outputExpr);
+
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ props.put(output.getName(), outputExpr);
+
+ propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+ }
+ }
+
+ // Output feed name and path for parent workflow
+ props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
+ props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+ }
+
+ private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+ String datasetName, LocationType locationType) throws FalconException {
+
+ SYNCDATASET syncdataset = new SYNCDATASET();
+ syncdataset.setName(datasetName);
+ syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+ String uriTemplate = storage.getUriTemplate(locationType);
+ if (storage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ syncdataset.setUriTemplate(uriTemplate);
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+ syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+ syncdataset.setTimezone(feed.getTimezone().getID());
+
+ if (feed.getAvailabilityFlag() == null) {
+ syncdataset.setDoneFlag("");
+ } else {
+ syncdataset.setDoneFlag(feed.getAvailabilityFlag());
+ }
+
+ return syncdataset;
+ }
+
+ private DATAOUT createDataOut(Output output) {
+ DATAOUT dataout = new DATAOUT();
+ dataout.setName(output.getName());
+ dataout.setDataset(output.getName());
+ dataout.setInstance(getELExpression(output.getInstance()));
+ return dataout;
+ }
+
+ private DATAIN createDataIn(Input input) {
+ DATAIN datain = new DATAIN();
+ datain.setName(input.getName());
+ datain.setDataset(input.getName());
+ datain.setStartInstance(getELExpression(input.getStart()));
+ datain.setEndInstance(getELExpression(input.getEnd()));
+ return datain;
+ }
+
+ private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
+ Storage storage, Map<String, String> props)
+ throws FalconException {
+
+ // stats and meta paths
+ createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
+ createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
+ createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
+ }
+
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
+ COORDINATORAPP coord, Map<String, String> props, Storage storage)
+ throws FalconException {
+
+ String name = output.getName();
+ String type = locType.name().toLowerCase();
+
+ SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
+ coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
+
+ DATAOUT dataout = new DATAOUT();
+ dataout.setName(name + type);
+ dataout.setDataset(name + type);
+ dataout.setInstance(getELExpression(output.getInstance()));
+
+ OUTPUTEVENTS outputEvents = coord.getOutputEvents();
+ if (outputEvents == null) {
+ outputEvents = new OUTPUTEVENTS();
+ coord.setOutputEvents(outputEvents);
+ }
+ outputEvents.getDataOut().add(dataout);
+
+ String outputExpr = "${coord:dataOut('" + name + type + "')}";
+ props.put(name + "." + type, outputExpr);
+ }
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
+ Map<String, String> props, String prefix) {
+ props.put(prefix + "_storage_type", tableStorage.getType().name());
+ props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+ props.put(prefix + "_database", tableStorage.getDatabase());
+ props.put(prefix + "_table", tableStorage.getTable());
+ }
+
+ private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
+ Map<String, String> props) {
+ String prefix = "falcon_" + input.getName();
+
+ propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+ props.put(prefix + "_partition_filter_pig",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'pig')}");
+ props.put(prefix + "_partition_filter_hive",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'hive')}");
+ props.put(prefix + "_partition_filter_java",
+ "${coord:dataInPartitionFilter('" + input.getName() + "', 'java')}");
+ }
+
+ private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
+ Map<String, String> props) {
+ String prefix = "falcon_" + output.getName();
+
+ propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+ props.put(prefix + "_dataout_partitions",
+ "${coord:dataOutPartitions('" + output.getName() + "')}");
+ props.put(prefix + "_dated_partition_value", "${coord:dataOutPartitionValue('"
+ + output.getName() + "', '" + tableStorage.getDatedPartitionKey() + "')}");
+ }
+
+ private String join(Iterator<String> itr, char sep) {
+ String joinedStr = StringUtils.join(itr, sep);
+ if (joinedStr.isEmpty()) {
+ joinedStr = "null";
+ }
+ return joinedStr;
+ }
+
+ private String getELExpression(String expr) {
+ if (expr != null) {
+ expr = "${" + expr + "}";
+ }
+ return expr;
}
@Override
- public String[] getWorkflowNames(Process process) {
- return new String[]{EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString()};
+ protected Map<String, String> getEntityProperties() {
+ Map<String, String> props = new HashMap<String, String>();
+ if (entity.getProperties() != null) {
+ for (Property prop : entity.getProperties().getProperties()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ }
+ return props;
+ }
+
+ private void propagateUserWorkflowProperties(Workflow processWorkflow,
+ Map<String, String> props, String processName) {
+ props.put("userWorkflowName", ProcessHelper.getProcessWorkflowName(
+ processWorkflow.getName(), processName));
+ props.put("userWorkflowVersion", processWorkflow.getVersion());
+ props.put("userWorkflowEngine", processWorkflow.getEngine().value());
+ }
+
+ protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
+ String wfName, Path parentWfPath) throws FalconException {
+ WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
+ wfApp.setName(wfName);
+ try {
+ addLibExtensionsToWorkflow(cluster, wfApp, EntityType.PROCESS, null);
+ } catch (IOException e) {
+ throw new FalconException("Failed to add library extensions for the workflow", e);
+ }
+
+ String userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent()).toString();
+ EngineType engineType = processWorkflow.getEngine();
+ for (Object object : wfApp.getDecisionOrForkOrJoin()) {
+ if (!(object instanceof ACTION)) {
+ continue;
+ }
+
+ ACTION action = (ACTION) object;
+ String actionName = action.getName();
+ if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
+ action.getSubWorkflow().setAppPath("${nameNode}" + userWfPath);
+ } else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
+ decoratePIGAction(cluster, process, action.getPig(), parentWfPath);
+ } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
+ decorateHiveAction(cluster, process, action, parentWfPath);
+ } else if (FALCON_ACTIONS.contains(actionName)) {
+ decorateWithOozieRetries(action);
+ }
+ }
+
+ //Create parent workflow
+ marshal(cluster, wfApp, parentWfPath);
+ }
+
+ private void decoratePIGAction(Cluster cluster, Process process,
+ PIG pigAction, Path parentWfPath) throws FalconException {
+ Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+ pigAction.setScript("${nameNode}" + userWfPath.toString());
+
+ addPrepareDeleteOutputPath(process, pigAction);
+
+ final List<String> paramList = pigAction.getParam();
+ addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
+ addOutputFeedsAsParams(paramList, process, cluster);
+
+ propagateProcessProperties(pigAction, process);
+
+ Storage.TYPE storageType = getStorageType(cluster, process);
+ if (Storage.TYPE.TABLE == storageType) {
+ // adds hive-site.xml in pig classpath
+ setupHiveConfiguration(cluster, parentWfPath, ""); // DO NOT ADD PREFIX!!!
+ pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+ }
+
+ addArchiveForCustomJars(cluster, pigAction.getArchive(),
+ getUserLibPath(cluster, parentWfPath.getParent()));
+ }
+
+ private void decorateHiveAction(Cluster cluster, Process process, ACTION wfAction,
+ Path parentWfPath) throws FalconException {
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(wfAction);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ Path userWfPath = getUserWorkflowPath(cluster, parentWfPath.getParent());
+ hiveAction.setScript("${nameNode}" + userWfPath.toString());
+
+ addPrepareDeleteOutputPath(process, hiveAction);
+
+ final List<String> paramList = hiveAction.getParam();
+ addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
+ addOutputFeedsAsParams(paramList, process, cluster);
+
+ propagateProcessProperties(hiveAction, process);
+
+ setupHiveConfiguration(cluster, parentWfPath, "falcon-");
+
+ addArchiveForCustomJars(cluster, hiveAction.getArchive(),
+ getUserLibPath(cluster, parentWfPath.getParent()));
+
+ OozieUtils.marshalHiveAction(wfAction, actionJaxbElement);
+ }
+
+ private void addPrepareDeleteOutputPath(Process process,
+ PIG pigAction) throws FalconException {
+ List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+ if (deleteOutputPathList.isEmpty()) {
+ return;
+ }
+
+ final PREPARE prepare = new PREPARE();
+ final List<DELETE> deleteList = prepare.getDelete();
+
+ for (String deletePath : deleteOutputPathList) {
+ final DELETE delete = new DELETE();
+ delete.setPath(deletePath);
+ deleteList.add(delete);
+ }
+
+ if (!deleteList.isEmpty()) {
+ pigAction.setPrepare(prepare);
+ }
+ }
+
+ private void addPrepareDeleteOutputPath(Process process,
+ org.apache.falcon.oozie.hive.ACTION hiveAction)
+ throws FalconException {
+
+ List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+ if (deleteOutputPathList.isEmpty()) {
+ return;
+ }
+
+ org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+ List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+ for (String deletePath : deleteOutputPathList) {
+ org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+ delete.setPath(deletePath);
+ deleteList.add(delete);
+ }
+
+ if (!deleteList.isEmpty()) {
+ hiveAction.setPrepare(prepare);
+ }
+ }
+
+ private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
+ final List<String> deleteList = new ArrayList<String>();
+ if (process.getOutputs() == null) {
+ return deleteList;
+ }
+
+ for (Output output : process.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+ if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+ continue; // prepare delete only applies to FileSystem storage
+ }
+
+ deleteList.add("${wf:conf('" + output.getName() + "')}");
+ }
+
+ return deleteList;
+ }
+
+ private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
+ String engineType) throws FalconException {
+ if (process.getInputs() == null) {
+ return;
+ }
+
+ for (Input input : process.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ final String inputName = input.getName();
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
+ Map<String, String> props = new HashMap<String, String>();
+ propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
+ for (String key : props.keySet()) {
+ paramList.add(key + "=${wf:conf('" + key + "')}");
+ }
+
+ paramList.add(paramName + "_filter=${wf:conf('"
+ + paramName + "_partition_filter_" + engineType + "')}");
+ }
+ }
+ }
+
+ private void addOutputFeedsAsParams(List<String> paramList, Process process,
+ Cluster cluster) throws FalconException {
+ if (process.getOutputs() == null) {
+ return;
+ }
+
+ for (Output output : process.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ final String outputName = output.getName(); // no prefix for backwards compatibility
+ paramList.add(outputName + "=${" + outputName + "}");
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ Map<String, String> props = new HashMap<String, String>();
+ propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
+ for (String key : props.keySet()) {
+ paramList.add(key + "=${wf:conf('" + key + "')}");
+ }
+ }
+ }
+ }
+
+ private void propagateProcessProperties(PIG pigAction, Process process) {
+ org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+ if (processProperties == null) {
+ return;
+ }
+
+ // Propagate user defined properties to job configuration
+ final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
+ pigAction.getConfiguration().getProperty();
+
+ // Propagate user defined properties to pig script as macros
+ // passed as parameters -p name=value that can be accessed as $name
+ final List<String> paramList = pigAction.getParam();
+
+ for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+ org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
+ new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
+ configProperty.setName(property.getName());
+ configProperty.setValue(property.getValue());
+ configuration.add(configProperty);
+
+ paramList.add(property.getName() + "=" + property.getValue());
+ }
+ }
+
+ private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
+ org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+ if (processProperties == null) {
+ return;
+ }
+
+ // Propagate user defined properties to job configuration
+ final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
+ hiveAction.getConfiguration().getProperty();
+
+ // Propagate user defined properties to pig script as macros
+ // passed as parameters -p name=value that can be accessed as $name
+ final List<String> paramList = hiveAction.getParam();
+
+ for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+ org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
+ new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+ configProperty.setName(property.getName());
+ configProperty.setValue(property.getValue());
+ configuration.add(configProperty);
+
+ paramList.add(property.getName() + "=" + property.getValue());
+ }
+ }
+
+ private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
+ Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
+ if (process.getInputs() == null) {
+ return storageType;
+ }
+
+ for (Input input : process.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ storageType = FeedHelper.getStorageType(feed, cluster);
+ if (Storage.TYPE.TABLE == storageType) {
+ break;
+ }
+ }
+
+ return storageType;
+ }
+
+ // creates hive-site.xml configuration in conf dir.
+ private void setupHiveConfiguration(Cluster cluster, Path wfPath,
+ String prefix) throws FalconException {
+ String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+ try {
+ FileSystem fs = HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
+ Path confPath = new Path(wfPath, "conf");
+ createHiveConf(fs, confPath, catalogUrl, cluster, prefix);
+ } catch (IOException e) {
+ throw new FalconException(e);
+ }
+ }
+
+ private void addArchiveForCustomJars(Cluster cluster, List<String> archiveList,
+ Path libPath) throws FalconException {
+ if (libPath == null) {
+ return;
+ }
+
+ try {
+ final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
+ if (fs.isFile(libPath)) { // File, not a Dir
+ archiveList.add(libPath.toString());
+ return;
+ }
+
+ // lib path is a directory, add each file under the lib dir to archive
+ final FileStatus[] fileStatuses = fs.listStatus(libPath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ try {
+ return fs.isFile(path) && path.getName().endsWith(".jar");
+ } catch (IOException ignore) {
+ return false;
+ }
+ }
+ });
+
+ for (FileStatus fileStatus : fileStatuses) {
+ archiveList.add(fileStatus.getPath().toString());
+ }
+ } catch (IOException e) {
+ throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
deleted file mode 100644
index fbda0ea..0000000
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperLateProcessTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.converter;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.conf.Configuration;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-
-/**
- * Test class for late data processing.
- */
-public class OozieProcessMapperLateProcessTest {
-
- private static final String CLUSTER_XML = "/config/late/late-cluster.xml";
- private static final String FEED1_XML = "/config/late/late-feed1.xml";
- private static final String FEED2_XML = "/config/late/late-feed2.xml";
- private static final String FEED3_XML = "/config/late/late-feed3.xml";
- private static final String PROCESS1_XML = "/config/late/late-process1.xml";
- private static final String PROCESS2_XML = "/config/late/late-process2.xml";
- private static final ConfigurationStore STORE = ConfigurationStore.get();
-
- private static EmbeddedCluster dfsCluster;
-
- @BeforeClass
- public void setUpDFS() throws Exception {
-
- cleanupStore();
-
- dfsCluster = EmbeddedCluster.newCluster("testCluster");
- Configuration conf = dfsCluster.getConf();
- String hdfsUrl = conf.get("fs.default.name");
-
- Cluster cluster = (Cluster) EntityType.CLUSTER.getUnmarshaller()
- .unmarshal(this.getClass().getResource(CLUSTER_XML));
- ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-
- STORE.publish(EntityType.CLUSTER, cluster);
-
- Feed feed1 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
- this.getClass().getResource(FEED1_XML));
- Feed feed2 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
- this.getClass().getResource(FEED2_XML));
- Feed feed3 = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(
- this.getClass().getResource(FEED3_XML));
-
- STORE.publish(EntityType.FEED, feed1);
- STORE.publish(EntityType.FEED, feed2);
- STORE.publish(EntityType.FEED, feed3);
-
- Process process1 = (Process) EntityType.PROCESS.getUnmarshaller()
- .unmarshal(this.getClass().getResource(PROCESS1_XML));
- STORE.publish(EntityType.PROCESS, process1);
- Process process2 = (Process) EntityType.PROCESS.getUnmarshaller()
- .unmarshal(this.getClass().getResource(PROCESS2_XML));
- STORE.publish(EntityType.PROCESS, process2);
- }
-
- private void cleanupStore() throws FalconException {
- STORE.remove(EntityType.PROCESS, "late-process1");
- STORE.remove(EntityType.PROCESS, "late-process2");
- STORE.remove(EntityType.FEED, "late-feed1");
- STORE.remove(EntityType.FEED, "late-feed2");
- STORE.remove(EntityType.FEED, "late-feed3");
- STORE.remove(EntityType.CLUSTER, "late-cluster");
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- cleanupStore();
- dfsCluster.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/e2545b08/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
deleted file mode 100644
index 22bf9fe..0000000
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.converter;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.cluster.Interfacetype;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.entity.v0.process.Validity;
-import org.apache.falcon.entity.v0.process.Workflow;
-import org.apache.falcon.messaging.EntityInstanceMessage;
-import org.apache.falcon.oozie.bundle.BUNDLEAPP;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
-import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
-import org.apache.falcon.oozie.coordinator.SYNCDATASET;
-import org.apache.falcon.oozie.workflow.ACTION;
-import org.apache.falcon.oozie.workflow.DECISION;
-import org.apache.falcon.oozie.workflow.PIG;
-import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-/**
- * Test for the Falcon entities mapping into Oozie artifacts.
- */
-public class OozieProcessMapperTest extends AbstractTestBase {
-
- private String hdfsUrl;
- private FileSystem fs;
-
- @BeforeClass
- public void setUpDFS() throws Exception {
- CurrentUser.authenticate("falcon");
-
- EmbeddedCluster cluster = EmbeddedCluster.newCluster("testCluster");
- Configuration conf = cluster.getConf();
- hdfsUrl = conf.get("fs.default.name");
- }
-
- @BeforeMethod
- public void setUp() throws Exception {
- super.setup();
-
- ConfigurationStore store = ConfigurationStore.get();
- Cluster cluster = store.get(EntityType.CLUSTER, "corp");
- ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
- ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
- fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
- fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
-
- Process process = store.get(EntityType.PROCESS, "clicksummary");
- Path wfpath = new Path(process.getWorkflow().getPath());
- assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
- }
-
- public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
- assertEquals("FALCON_PROCESS_DEFAULT_" + process.getName(), coord.getName());
- Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
- assertEquals(SchemaHelper.formatDateUTC(processValidity.getStart()), coord.getStart());
- assertEquals(SchemaHelper.formatDateUTC(processValidity.getEnd()), coord.getEnd());
- assertEquals("${coord:" + process.getFrequency().toString() + "}", coord.getFrequency());
- assertEquals(process.getTimezone().getID(), coord.getTimezone());
-
- assertEquals(process.getParallel() + "", coord.getControls().getConcurrency());
- assertEquals(process.getOrder().name(), coord.getControls().getExecution());
-
- assertEquals(process.getInputs().getInputs().get(0).getName(),
- coord.getInputEvents().getDataIn().get(0).getName());
- assertEquals(process.getInputs().getInputs().get(0).getName(),
- coord.getInputEvents().getDataIn().get(0).getDataset());
- assertEquals("${" + process.getInputs().getInputs().get(0).getStart() + "}",
- coord.getInputEvents().getDataIn().get(0).getStartInstance());
- assertEquals("${" + process.getInputs().getInputs().get(0).getEnd() + "}",
- coord.getInputEvents().getDataIn().get(0).getEndInstance());
-
- assertEquals(process.getInputs().getInputs().get(1).getName(),
- coord.getInputEvents().getDataIn().get(1).getName());
- assertEquals(process.getInputs().getInputs().get(1).getName(),
- coord.getInputEvents().getDataIn().get(1).getDataset());
- assertEquals("${" + process.getInputs().getInputs().get(1).getStart() + "}",
- coord.getInputEvents().getDataIn().get(1).getStartInstance());
- assertEquals("${" + process.getInputs().getInputs().get(1).getEnd() + "}",
- coord.getInputEvents().getDataIn().get(1).getEndInstance());
-
- assertEquals(process.getOutputs().getOutputs().get(0).getName() + "stats",
- coord.getOutputEvents().getDataOut().get(1).getName());
- assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
- coord.getOutputEvents().getDataOut().get(2).getName());
- assertEquals(process.getOutputs().getOutputs().get(0).getName() + "tmp",
- coord.getOutputEvents().getDataOut().get(3).getName());
-
- assertEquals(process.getOutputs().getOutputs().get(0).getName(),
- coord.getOutputEvents().getDataOut().get(0).getName());
- assertEquals("${" + process.getOutputs().getOutputs().get(0).getInstance() + "}",
- coord.getOutputEvents().getDataOut().get(0).getInstance());
- assertEquals(process.getOutputs().getOutputs().get(0).getName(),
- coord.getOutputEvents().getDataOut().get(0).getDataset());
-
- assertEquals(6, coord.getDatasets().getDatasetOrAsyncDataset().size());
-
- ConfigurationStore store = ConfigurationStore.get();
- Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
- SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
-
- final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
- assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
- assertEquals(feed.getTimezone().getID(), ds.getTimezone());
- assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
- assertEquals("", ds.getDoneFlag());
- assertEquals(ds.getUriTemplate(),
- FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
-
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
- assertEquals(props.get("mapred.job.priority"), "LOW");
-
- assertLibExtensions(coord);
- }
-
- @Test
- public void testBundle() throws Exception {
- String path = StartupProperties.get().getProperty("system.lib.location");
- if (!new File(path).exists()) {
- Assert.assertTrue(new File(path).mkdirs());
- }
- Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
-
- WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
- testParentWorkflow(process, parentWorkflow);
- }
-
- @Test
- public void testBundle1() throws Exception {
- Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
- process.setFrequency(Frequency.fromString("minutes(1)"));
- process.setTimeout(Frequency.fromString("minutes(15)"));
-
- WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
- testParentWorkflow(process, parentWorkflow);
- }
-
- @Test
- public void testPigProcessMapper() throws Exception {
- Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
- Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
-
- prepare(process);
- WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
- testParentWorkflow(process, parentWorkflow);
-
- List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
- ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
- Assert.assertEquals("user-pig-job", pigActionNode.getName());
-
- final PIG pigAction = pigActionNode.getPig();
- Assert.assertEquals(pigAction.getScript(), "${nameNode}/falcon/staging/workflows/pig-process/user/id.pig");
- Assert.assertNotNull(pigAction.getPrepare());
- Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
- Assert.assertFalse(pigAction.getParam().isEmpty());
- Assert.assertEquals(5, pigAction.getParam().size());
- Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
- Assert.assertTrue(pigAction.getFile().size() > 0);
-
- ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
- Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
- Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
- }
-
- @Test
- public void testHiveProcessMapper() throws Exception {
- URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
- Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
- ConfigurationStore.get().publish(EntityType.FEED, inFeed);
-
- resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
- Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
- ConfigurationStore.get().publish(EntityType.FEED, outFeed);
-
- resource = this.getClass().getResource("/config/process/hive-process.xml");
- Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
- ConfigurationStore.get().publish(EntityType.PROCESS, process);
-
- Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
- prepare(process);
- OozieProcessMapper mapper = new OozieProcessMapper(process);
- Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
- mapper.map(cluster, bundlePath);
- assertTrue(fs.exists(bundlePath));
-
- BUNDLEAPP bundle = getBundle(bundlePath);
- assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
- assertEquals(1, bundle.getCoordinator().size());
- assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
- bundle.getCoordinator().get(0).getName());
- String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
- COORDINATORAPP coord = getCoordinator(new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
-
- // verify table props
- Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
- for (Map.Entry<String, String> entry : props.entrySet()) {
- if (expected.containsKey(entry.getKey())) {
- Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
- }
- }
-
- String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
- WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
- testParentWorkflow(process, parentWorkflow);
-
- List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
-
- ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
- Assert.assertEquals("user-hive-job", hiveNode.getName());
-
- JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = mapper.unMarshalHiveAction(hiveNode);
- org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
-
- Assert.assertEquals(hiveAction.getScript(),
- "${nameNode}/falcon/staging/workflows/hive-process/user/script.hql");
- Assert.assertNull(hiveAction.getPrepare());
- Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
- Assert.assertFalse(hiveAction.getParam().isEmpty());
- Assert.assertEquals(11, hiveAction.getParam().size());
- }
-
- private void prepare(Process process) throws IOException {
- Path wf = new Path(process.getWorkflow().getPath());
- fs.mkdirs(wf.getParent());
- fs.create(wf).close();
- }
-
- @Test
- public void testProcessMapperForTableStorage() throws Exception {
- URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
- Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
- ConfigurationStore.get().publish(EntityType.FEED, inFeed);
-
- resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
- Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
- ConfigurationStore.get().publish(EntityType.FEED, outFeed);
-
- resource = this.getClass().getResource("/config/process/pig-process-table.xml");
- Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
- ConfigurationStore.get().publish(EntityType.PROCESS, process);
-
- Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
- OozieProcessMapper mapper = new OozieProcessMapper(process);
- Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
- mapper.map(cluster, bundlePath);
- assertTrue(fs.exists(bundlePath));
-
- BUNDLEAPP bundle = getBundle(bundlePath);
- assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
- assertEquals(1, bundle.getCoordinator().size());
- assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
- bundle.getCoordinator().get(0).getName());
- String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
- COORDINATORAPP coord = getCoordinator(new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
-
- // verify table props
- Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
- for (Map.Entry<String, String> entry : props.entrySet()) {
- if (expected.containsKey(entry.getKey())) {
- Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
- }
- }
-
- // verify the late data params
- Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
- Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
- Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
-
- // verify the post processing params
- Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
- Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
- }
-
- private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
- Cluster cluster) throws FalconException {
- Map<String, String> expected = new HashMap<String, String>();
- for (Input input : process.getInputs().getInputs()) {
- CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
- propagateStorageProperties(input.getName(), storage, expected);
- }
-
- for (Output output : process.getOutputs().getOutputs()) {
- CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
- propagateStorageProperties(output.getName(), storage, expected);
- }
-
- return expected;
- }
-
- private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
- Map<String, String> props) {
- String prefix = "falcon_" + feedName;
- props.put(prefix + "_storage_type", tableStorage.getType().name());
- props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
- props.put(prefix + "_database", tableStorage.getDatabase());
- props.put(prefix + "_table", tableStorage.getTable());
-
- if (prefix.equals("falcon_input")) {
- props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
- props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
- props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
- } else if (prefix.equals("falcon_output")) {
- props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
- }
- }
-
- @Test
- public void testProcessWorkflowMapper() throws Exception {
- Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
- Workflow processWorkflow = process.getWorkflow();
- Assert.assertEquals("test", processWorkflow.getName());
- Assert.assertEquals("1.0.0", processWorkflow.getVersion());
- }
-
- @SuppressWarnings("unchecked")
- private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
- String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
- JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
- WORKFLOWAPP wf = ((JAXBElement<WORKFLOWAPP>) jaxbContext.createUnmarshaller().unmarshal(
- fs.open(new Path(wfPath, "workflow.xml")))).getValue();
- List<Object> actions = wf.getDecisionOrForkOrJoin();
- for (Object obj : actions) {
- if (!(obj instanceof ACTION)) {
- continue;
- }
- ACTION action = (ACTION) obj;
- List<String> files = null;
- if (action.getJava() != null) {
- files = action.getJava().getFile();
- } else if (action.getPig() != null) {
- files = action.getPig().getFile();
- } else if (action.getMapReduce() != null) {
- files = action.getMapReduce().getFile();
- }
- if (files != null) {
- Assert.assertTrue(files.get(files.size() - 1)
- .endsWith("/projects/falcon/working/libext/PROCESS/ext.jar"));
- }
- }
- }
-
- private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
- throws Exception {
- Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
- OozieProcessMapper mapper = new OozieProcessMapper(process);
- Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
- mapper.map(cluster, bundlePath);
- assertTrue(fs.exists(bundlePath));
-
- BUNDLEAPP bundle = getBundle(bundlePath);
- assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
- assertEquals(1, bundle.getCoordinator().size());
- assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
- bundle.getCoordinator().get(0).getName());
- String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
- COORDINATORAPP coord = getCoordinator(new Path(coordPath));
- testDefCoordMap(process, coord);
- assertEquals(coord.getControls().getThrottle(), throttle);
- assertEquals(coord.getControls().getTimeout(), timeout);
-
- String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
- return getParentWorkflow(new Path(wfPath));
- }
-
- public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
- Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
-
- List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
- Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
- Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
- Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
- Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
- Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
- Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
- Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
- Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
- Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryMax());
- Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(1)).getRetryInterval());
- Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryMax());
- Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(6)).getRetryInterval());
- Assert.assertEquals("3", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryMax());
- Assert.assertEquals("1", ((ACTION) decisionOrForkOrJoin.get(7)).getRetryInterval());
- }
-
- private COORDINATORAPP getCoordinator(Path path) throws Exception {
- String bundleStr = readFile(path);
-
- Unmarshaller unmarshaller = JAXBContext.newInstance(COORDINATORAPP.class).createUnmarshaller();
- SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
- Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-coordinator-0.3.xsd"));
- unmarshaller.setSchema(schema);
- JAXBElement<COORDINATORAPP> jaxbBundle = unmarshaller.unmarshal(
- new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), COORDINATORAPP.class);
- return jaxbBundle.getValue();
- }
-
- @SuppressWarnings("unchecked")
- private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
- String workflow = readFile(new Path(path, "workflow.xml"));
-
- JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
- Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
- return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
- new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
- }
-
- private BUNDLEAPP getBundle(Path path) throws Exception {
- String bundleStr = readFile(new Path(path, "bundle.xml"));
-
- Unmarshaller unmarshaller = JAXBContext.newInstance(BUNDLEAPP.class).createUnmarshaller();
- SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
- Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
- unmarshaller.setSchema(schema);
- JAXBElement<BUNDLEAPP> jaxbBundle = unmarshaller.unmarshal(
- new StreamSource(new ByteArrayInputStream(bundleStr.trim().getBytes())), BUNDLEAPP.class);
- return jaxbBundle.getValue();
- }
-
- private String readFile(Path path) throws Exception {
- BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(path)));
- String line;
- StringBuilder contents = new StringBuilder();
- while ((line = reader.readLine()) != null) {
- contents.append(line);
- }
- return contents.toString();
- }
-
- @Override
- @AfterMethod
- public void cleanup() throws Exception {
- super.cleanup();
- ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
- ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
- ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
- ConfigurationStore.get().remove(EntityType.PROCESS, "dumb-process");
- }
-
- @Test
- public void testProcessWithNoInputsAndOutputs() throws Exception {
- Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
- ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
-
- URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
- Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
- ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
-
- OozieProcessMapper mapper = new OozieProcessMapper(processEntity);
- Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
- mapper.map(cluster, bundlePath);
- assertTrue(fs.exists(bundlePath));
-
- BUNDLEAPP bundle = getBundle(bundlePath);
- assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
- assertEquals(1, bundle.getCoordinator().size());
- assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
- bundle.getCoordinator().get(0).getName());
- String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
-
- COORDINATORAPP coord = getCoordinator(new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
-
- String[] expected = {
- EntityInstanceMessage.ARG.feedNames.getPropName(),
- EntityInstanceMessage.ARG.feedInstancePaths.getPropName(),
- "falconInputFeeds",
- "falconInPaths",
- "userWorkflowName",
- "userWorkflowVersion",
- "userWorkflowEngine",
- };
-
- for (String property : expected) {
- Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
- }
- }
-}