You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/11/12 12:05:23 UTC
[06/12] FALCON-85 Hive (HCatalog) integration. Contributed by
Venkatesh Seetharam FALCON-163 Merge FALCON-85 branch into main line.
Contributed by Venkatesh Seetharam
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
index e4441cc..eed7fa3 100644
--- a/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
+++ b/process/src/main/java/org/apache/falcon/converter/OozieProcessMapper.java
@@ -21,14 +21,17 @@ package org.apache.falcon.converter;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
+import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.EngineType;
@@ -57,9 +60,20 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-
+import org.apache.xerces.dom.ElementNSImpl;
+import org.w3c.dom.Document;
+
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.dom.DOMResult;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
/**
* This class maps the Falcon entities into Oozie artifacts.
@@ -108,18 +122,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
// Configuration
Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
- List<String> inputFeeds = new ArrayList<String>();
- List<String> inputPaths = new ArrayList<String>();
- initializeInputPaths(cluster, process, coord, props, inputFeeds, inputPaths); // inputs
- props.put("falconInPaths", join(inputPaths.iterator(), '#'));
- props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
-
- List<String> outputFeeds = new ArrayList<String>();
- List<String> outputPaths = new ArrayList<String>();
- initializeOutputPaths(cluster, process, coord, props, outputFeeds, outputPaths); // outputs
- // Output feed name and path for parent workflow
- props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
- props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+ initializeInputPaths(cluster, process, coord, props); // inputs
+ initializeOutputPaths(cluster, process, coord, props); // outputs
Workflow processWorkflow = process.getWorkflow();
props.put("userWorkflowEngine", processWorkflow.getEngine().value());
@@ -177,13 +181,18 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
}
private void initializeInputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
- Map<String, String> props, List<String> inputFeeds, List<String> inputPaths)
- throws FalconException {
+ Map<String, String> props) throws FalconException {
if (process.getInputs() == null) {
return;
}
+ List<String> inputFeeds = new ArrayList<String>();
+ List<String> inputPaths = new ArrayList<String>();
+ List<String> inputFeedStorageTypes = new ArrayList<String>();
for (Input input : process.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
if (!input.isOptional()) {
if (coord.getDatasets() == null) {
coord.setDatasets(new DATASETS());
@@ -192,24 +201,43 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
coord.setInputEvents(new INPUTEVENTS());
}
- SYNCDATASET syncdataset = createDataSet(input.getFeed(), cluster, input.getName(),
- LocationType.DATA);
+ SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, input.getName(), LocationType.DATA);
coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
DATAIN datain = createDataIn(input);
coord.getInputEvents().getDataIn().add(datain);
}
- String inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
- props.put(input.getName(), inputExpr);
+ String inputExpr = null;
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ inputExpr = getELExpression("dataIn('" + input.getName() + "', '" + input.getPartition() + "')");
+ props.put(input.getName(), inputExpr);
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ inputExpr = "${coord:dataIn('" + input.getName() + "')}";
+ propagateCatalogTableProperties(input, (CatalogStorage) storage, props);
+ }
+
inputFeeds.add(input.getName());
inputPaths.add(inputExpr);
+ inputFeedStorageTypes.add(storage.getType().name());
}
+
+ propagateLateDataProperties(inputFeeds, inputPaths, inputFeedStorageTypes, props);
+ }
+
+ private void propagateLateDataProperties(List<String> inputFeeds, List<String> inputPaths,
+ List<String> inputFeedStorageTypes, Map<String, String> props) {
+ // populate late data handler - should-record action
+ props.put("falconInputFeeds", join(inputFeeds.iterator(), '#'));
+ props.put("falconInPaths", join(inputPaths.iterator(), '#'));
+
+ // storage type for each corresponding feed sent as a param to LateDataHandler
+ // needed to compute usage based on storage type in LateDataHandler
+ props.put("falconInputFeedStorageTypes", join(inputFeedStorageTypes.iterator(), '#'));
}
private void initializeOutputPaths(Cluster cluster, Process process, COORDINATORAPP coord,
- Map<String, String> props, List<String> outputFeeds, List<String> outputPaths)
- throws FalconException {
+ Map<String, String> props) throws FalconException {
if (process.getOutputs() == null) {
return;
}
@@ -222,26 +250,60 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
coord.setOutputEvents(new OUTPUTEVENTS());
}
+ List<String> outputFeeds = new ArrayList<String>();
+ List<String> outputPaths = new ArrayList<String>();
for (Output output : process.getOutputs().getOutputs()) {
- SYNCDATASET syncdataset = createDataSet(output.getFeed(), cluster, output.getName(), LocationType.DATA);
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ SYNCDATASET syncdataset = createDataSet(feed, cluster, storage, output.getName(), LocationType.DATA);
coord.getDatasets().getDatasetOrAsyncDataset().add(syncdataset);
DATAOUT dataout = createDataOut(output);
coord.getOutputEvents().getDataOut().add(dataout);
String outputExpr = "${coord:dataOut('" + output.getName() + "')}";
- props.put(output.getName(), outputExpr);
outputFeeds.add(output.getName());
outputPaths.add(outputExpr);
- // stats and meta paths
- createOutputEvent(output.getFeed(), output.getName(), cluster, "stats",
- LocationType.STATS, coord, props, output.getInstance());
- createOutputEvent(output.getFeed(), output.getName(), cluster, "meta",
- LocationType.META, coord, props, output.getInstance());
- createOutputEvent(output.getFeed(), output.getName(), cluster, "tmp",
- LocationType.TMP, coord, props, output.getInstance());
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ props.put(output.getName(), outputExpr);
+
+ propagateFileSystemProperties(output, feed, cluster, coord, storage, props);
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ propagateCatalogTableProperties(output, (CatalogStorage) storage, props);
+ }
+ }
+
+ // Output feed name and path for parent workflow
+ props.put(ARG.feedNames.getPropName(), join(outputFeeds.iterator(), ','));
+ props.put(ARG.feedInstancePaths.getPropName(), join(outputPaths.iterator(), ','));
+ }
+
+ private SYNCDATASET createDataSet(Feed feed, Cluster cluster, Storage storage,
+ String datasetName, LocationType locationType) throws FalconException {
+
+ SYNCDATASET syncdataset = new SYNCDATASET();
+ syncdataset.setName(datasetName);
+ syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
+
+ String uriTemplate = storage.getUriTemplate(locationType);
+ if (storage.getType() == Storage.TYPE.TABLE) {
+ uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
+ }
+ syncdataset.setUriTemplate(uriTemplate);
+
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
+ syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
+ syncdataset.setTimezone(feed.getTimezone().getID());
+
+ if (feed.getAvailabilityFlag() == null) {
+ syncdataset.setDoneFlag("");
+ } else {
+ syncdataset.setDoneFlag(feed.getAvailabilityFlag());
}
+
+ return syncdataset;
}
private DATAOUT createDataOut(Output output) {
@@ -261,25 +323,71 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
return datain;
}
- //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
- private void createOutputEvent(String feed, String name, Cluster cluster,
- String type, LocationType locType, COORDINATORAPP coord,
- Map<String, String> props, String instance) throws FalconException {
- SYNCDATASET dataset = createDataSet(feed, cluster, name + type,
- locType);
+ private void propagateFileSystemProperties(Output output, Feed feed, Cluster cluster, COORDINATORAPP coord,
+ Storage storage, Map<String, String> props)
+ throws FalconException {
+
+ // stats and meta paths
+ createOutputEvent(output, feed, cluster, LocationType.STATS, coord, props, storage);
+ createOutputEvent(output, feed, cluster, LocationType.META, coord, props, storage);
+ createOutputEvent(output, feed, cluster, LocationType.TMP, coord, props, storage);
+ }
+
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ private void createOutputEvent(Output output, Feed feed, Cluster cluster, LocationType locType,
+ COORDINATORAPP coord, Map<String, String> props, Storage storage)
+ throws FalconException {
+
+ String name = output.getName();
+ String type = locType.name().toLowerCase();
+
+ SYNCDATASET dataset = createDataSet(feed, cluster, storage, name + type, locType);
coord.getDatasets().getDatasetOrAsyncDataset().add(dataset);
+
DATAOUT dataout = new DATAOUT();
- if (coord.getOutputEvents() == null) {
- coord.setOutputEvents(new OUTPUTEVENTS());
- }
dataout.setName(name + type);
dataout.setDataset(name + type);
- dataout.setInstance(getELExpression(instance));
- coord.getOutputEvents().getDataOut().add(dataout);
+ dataout.setInstance(getELExpression(output.getInstance()));
+
+ OUTPUTEVENTS outputEvents = coord.getOutputEvents();
+ if (outputEvents == null) {
+ outputEvents = new OUTPUTEVENTS();
+ coord.setOutputEvents(outputEvents);
+ }
+ outputEvents.getDataOut().add(dataout);
+
String outputExpr = "${coord:dataOut('" + name + type + "')}";
props.put(name + "." + type, outputExpr);
}
- //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ private void propagateCommonCatalogTableProperties(CatalogStorage tableStorage,
+ Map<String, String> props, String prefix) {
+ props.put(prefix + "_storage_type", tableStorage.getType().name());
+ props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+ props.put(prefix + "_database", tableStorage.getDatabase());
+ props.put(prefix + "_table", tableStorage.getTable());
+ }
+
+ private void propagateCatalogTableProperties(Input input, CatalogStorage tableStorage,
+ Map<String, String> props) {
+ String prefix = "falcon_" + input.getName();
+
+ propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+ props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
+ props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
+ props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+ }
+
+ private void propagateCatalogTableProperties(Output output, CatalogStorage tableStorage,
+ Map<String, String> props) {
+ String prefix = "falcon_" + output.getName();
+
+ propagateCommonCatalogTableProperties(tableStorage, props, prefix);
+
+ props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
+ }
private String join(Iterator<String> itr, char sep) {
String joinedStr = StringUtils.join(itr, sep);
@@ -289,29 +397,6 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
return joinedStr;
}
- private SYNCDATASET createDataSet(String feedName, Cluster cluster, String datasetName,
- LocationType locationType) throws FalconException {
- Feed feed = EntityUtil.getEntity(EntityType.FEED, feedName);
-
- SYNCDATASET syncdataset = new SYNCDATASET();
- syncdataset.setName(datasetName);
- String locPath = FeedHelper.getLocation(feed, locationType,
- cluster.getName()).getPath();
- syncdataset.setUriTemplate(new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}"
- + locPath);
- syncdataset.setFrequency("${coord:" + feed.getFrequency().toString() + "}");
-
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName());
- syncdataset.setInitialInstance(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()));
- syncdataset.setTimezone(feed.getTimezone().getID());
- if (feed.getAvailabilityFlag() == null) {
- syncdataset.setDoneFlag("");
- } else {
- syncdataset.setDoneFlag(feed.getAvailabilityFlag());
- }
- return syncdataset;
- }
-
private String getELExpression(String expr) {
if (expr != null) {
expr = "${" + expr + "}";
@@ -331,8 +416,8 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
return props;
}
- private void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
- String wfName, Path wfPath) throws FalconException {
+ protected void createWorkflow(Cluster cluster, Process process, Workflow processWorkflow,
+ String wfName, Path wfPath) throws FalconException {
WORKFLOWAPP wfApp = getWorkflowTemplate(DEFAULT_WF_TEMPLATE);
wfApp.setName(wfName);
try {
@@ -353,33 +438,75 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
if (engineType == EngineType.OOZIE && actionName.equals("user-oozie-workflow")) {
action.getSubWorkflow().setAppPath(storagePath);
} else if (engineType == EngineType.PIG && actionName.equals("user-pig-job")) {
- decoratePIGAction(cluster, process, processWorkflow, storagePath, action.getPig());
+ decoratePIGAction(cluster, process, processWorkflow, storagePath, action.getPig(), wfPath);
+ } else if (engineType == EngineType.HIVE && actionName.equals("user-hive-job")) {
+ decorateHiveAction(cluster, process, processWorkflow, storagePath, action, wfPath);
}
}
marshal(cluster, wfApp, wfPath);
}
- private void decoratePIGAction(Cluster cluster, Process process, Workflow processWorkflow,
- String storagePath, PIG pigAction) throws FalconException {
-
+ private void decoratePIGAction(Cluster cluster, Process process,
+ Workflow processWorkflow, String storagePath,
+ PIG pigAction, Path wfPath) throws FalconException {
pigAction.setScript(storagePath);
addPrepareDeleteOutputPath(process, pigAction);
- addInputOutputFeedsAsParams(pigAction, process);
+ final List<String> paramList = pigAction.getParam();
+ addInputFeedsAsParams(paramList, process, cluster, EngineType.PIG.name().toLowerCase());
+ addOutputFeedsAsParams(paramList, process, cluster);
propagateProcessProperties(pigAction, process);
- addArchiveForCustomJars(cluster, processWorkflow, pigAction);
+ Storage.TYPE storageType = getStorageType(cluster, process);
+ if (Storage.TYPE.TABLE == storageType) {
+ // adds hive-site.xml in pig classpath
+ setupHiveConfiguration(cluster, wfPath, ""); // DO NOT ADD PREFIX!!!
+ pigAction.getFile().add("${wf:appPath()}/conf/hive-site.xml");
+ }
+
+ addArchiveForCustomJars(cluster, processWorkflow, pigAction.getArchive());
}
- private void addPrepareDeleteOutputPath(Process process, PIG pigAction) {
+ private void decorateHiveAction(Cluster cluster, Process process,
+ Workflow processWorkflow, String storagePath,
+ ACTION wfAction, Path wfPath) throws FalconException {
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = unMarshalHiveAction(wfAction);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ hiveAction.setScript(storagePath);
+
+ addPrepareDeleteOutputPath(process, hiveAction);
+
+ final List<String> paramList = hiveAction.getParam();
+ addInputFeedsAsParams(paramList, process, cluster, EngineType.HIVE.name().toLowerCase());
+ addOutputFeedsAsParams(paramList, process, cluster);
+
+ propagateProcessProperties(hiveAction, process);
+
+ setupHiveConfiguration(cluster, wfPath, "falcon-");
+
+ addArchiveForCustomJars(cluster, processWorkflow, hiveAction.getArchive());
+
+ marshalHiveAction(wfAction, actionJaxbElement);
+ }
+
+ private void addPrepareDeleteOutputPath(Process process,
+ PIG pigAction) throws FalconException {
+ List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+ if (deleteOutputPathList.isEmpty()) {
+ return;
+ }
+
final PREPARE prepare = new PREPARE();
final List<DELETE> deleteList = prepare.getDelete();
- for (Output output : process.getOutputs().getOutputs()) {
+
+ for (String deletePath : deleteOutputPathList) {
final DELETE delete = new DELETE();
- delete.setPath("${wf:conf('" + output.getName() + "')}");
+ delete.setPath(deletePath);
deleteList.add(delete);
}
@@ -388,14 +515,83 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
}
}
- private void addInputOutputFeedsAsParams(PIG pigAction, Process process) throws FalconException {
- final List<String> paramList = pigAction.getParam();
+ private void addPrepareDeleteOutputPath(Process process,
+ org.apache.falcon.oozie.hive.ACTION hiveAction)
+ throws FalconException {
+
+ List<String> deleteOutputPathList = getPrepareDeleteOutputPathList(process);
+ if (deleteOutputPathList.isEmpty()) {
+ return;
+ }
+
+ org.apache.falcon.oozie.hive.PREPARE prepare = new org.apache.falcon.oozie.hive.PREPARE();
+ List<org.apache.falcon.oozie.hive.DELETE> deleteList = prepare.getDelete();
+
+ for (String deletePath : deleteOutputPathList) {
+ org.apache.falcon.oozie.hive.DELETE delete = new org.apache.falcon.oozie.hive.DELETE();
+ delete.setPath(deletePath);
+ deleteList.add(delete);
+ }
+
+ if (!deleteList.isEmpty()) {
+ hiveAction.setPrepare(prepare);
+ }
+ }
+
+ private List<String> getPrepareDeleteOutputPathList(Process process) throws FalconException {
+ final List<String> deleteList = new ArrayList<String>();
+ for (Output output : process.getOutputs().getOutputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+
+ if (FeedHelper.getStorageType(feed) == Storage.TYPE.TABLE) {
+ continue; // prepare delete only applies to FileSystem storage
+ }
+
+ deleteList.add("${wf:conf('" + output.getName() + "')}");
+ }
+
+ return deleteList;
+ }
+
+ private void addInputFeedsAsParams(List<String> paramList, Process process, Cluster cluster,
+ String engineType) throws FalconException {
for (Input input : process.getInputs().getInputs()) {
- paramList.add(input.getName() + "=${" + input.getName() + "}");
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ final String inputName = input.getName();
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ paramList.add(inputName + "=${" + inputName + "}"); // no prefix for backwards compatibility
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ final String paramName = "falcon_" + inputName; // prefix 'falcon' for new params
+ Map<String, String> props = new HashMap<String, String>();
+ propagateCommonCatalogTableProperties((CatalogStorage) storage, props, paramName);
+ for (String key : props.keySet()) {
+ paramList.add(key + "=${wf:conf('" + key + "')}");
+ }
+
+ paramList.add(paramName + "_filter=${wf:conf('"
+ + paramName + "_partition_filter_" + engineType + "')}");
+ }
}
+ }
+ private void addOutputFeedsAsParams(List<String> paramList, Process process,
+ Cluster cluster) throws FalconException {
for (Output output : process.getOutputs().getOutputs()) {
- paramList.add(output.getName() + "=${" + output.getName() + "}");
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
+ Storage storage = FeedHelper.createStorage(cluster, feed);
+
+ if (storage.getType() == Storage.TYPE.FILESYSTEM) {
+ final String outputName = output.getName(); // no prefix for backwards compatibility
+ paramList.add(outputName + "=${" + outputName + "}");
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ Map<String, String> props = new HashMap<String, String>();
+ propagateCatalogTableProperties(output, (CatalogStorage) storage, props); // prefix is auto added
+ for (String key : props.keySet()) {
+ paramList.add(key + "=${wf:conf('" + key + "')}");
+ }
+ }
}
}
@@ -408,24 +604,79 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
// Propagate user defined properties to job configuration
final List<org.apache.falcon.oozie.workflow.CONFIGURATION.Property> configuration =
pigAction.getConfiguration().getProperty();
+
+ // Propagate user defined properties to pig script as macros
+ // passed as parameters -p name=value that can be accessed as $name
+ final List<String> paramList = pigAction.getParam();
+
for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
org.apache.falcon.oozie.workflow.CONFIGURATION.Property configProperty =
new org.apache.falcon.oozie.workflow.CONFIGURATION.Property();
configProperty.setName(property.getName());
configProperty.setValue(property.getValue());
configuration.add(configProperty);
+
+ paramList.add(property.getName() + "=" + property.getValue());
}
+ }
+
+ private void propagateProcessProperties(org.apache.falcon.oozie.hive.ACTION hiveAction, Process process) {
+ org.apache.falcon.entity.v0.process.Properties processProperties = process.getProperties();
+ if (processProperties == null) {
+ return;
+ }
+
+ // Propagate user defined properties to job configuration
+ final List<org.apache.falcon.oozie.hive.CONFIGURATION.Property> configuration =
+ hiveAction.getConfiguration().getProperty();
// Propagate user defined properties to pig script as macros
// passed as parameters -p name=value that can be accessed as $name
- final List<String> paramList = pigAction.getParam();
+ final List<String> paramList = hiveAction.getParam();
+
for (org.apache.falcon.entity.v0.process.Property property : processProperties.getProperties()) {
+ org.apache.falcon.oozie.hive.CONFIGURATION.Property configProperty =
+ new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+ configProperty.setName(property.getName());
+ configProperty.setValue(property.getValue());
+ configuration.add(configProperty);
+
paramList.add(property.getName() + "=" + property.getValue());
}
}
+ private Storage.TYPE getStorageType(Cluster cluster, Process process) throws FalconException {
+ Storage.TYPE storageType = Storage.TYPE.FILESYSTEM;
+ if (process.getInputs() == null) {
+ return storageType;
+ }
+
+ for (Input input : process.getInputs().getInputs()) {
+ Feed feed = EntityUtil.getEntity(EntityType.FEED, input.getFeed());
+ storageType = FeedHelper.getStorageType(feed, cluster);
+ if (Storage.TYPE.TABLE == storageType) {
+ break;
+ }
+ }
+
+ return storageType;
+ }
+
+ // creates hive-site.xml configuration in conf dir.
+ private void setupHiveConfiguration(Cluster cluster, Path wfPath,
+ String prefix) throws FalconException {
+ String catalogUrl = ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).getEndpoint();
+ try {
+ FileSystem fs = FileSystem.get(ClusterHelper.getConfiguration(cluster));
+ Path confPath = new Path(wfPath, "conf");
+ createHiveConf(fs, confPath, catalogUrl, prefix);
+ } catch (IOException e) {
+ throw new FalconException(e);
+ }
+ }
+
private void addArchiveForCustomJars(Cluster cluster, Workflow processWorkflow,
- PIG pigAction) throws FalconException {
+ List<String> archiveList) throws FalconException {
String processWorkflowLib = processWorkflow.getLib();
if (processWorkflowLib == null) {
return;
@@ -435,7 +686,7 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
try {
final FileSystem fs = libPath.getFileSystem(ClusterHelper.getConfiguration(cluster));
if (fs.isFile(libPath)) { // File, not a Dir
- pigAction.getArchive().add(processWorkflowLib);
+ archiveList.add(processWorkflowLib);
return;
}
@@ -452,10 +703,34 @@ public class OozieProcessMapper extends AbstractOozieEntityMapper<Process> {
});
for (FileStatus fileStatus : fileStatuses) {
- pigAction.getArchive().add(fileStatus.getPath().toString());
+ archiveList.add(fileStatus.getPath().toString());
}
} catch (IOException e) {
throw new FalconException("Error adding archive for custom jars under: " + libPath, e);
}
}
+
+ @SuppressWarnings("unchecked")
+ protected JAXBElement<org.apache.falcon.oozie.hive.ACTION> unMarshalHiveAction(ACTION wfAction) {
+ try {
+ Unmarshaller unmarshaller = HIVE_ACTION_JAXB_CONTEXT.createUnmarshaller();
+ unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
+ return (JAXBElement<org.apache.falcon.oozie.hive.ACTION>)
+ unmarshaller.unmarshal((ElementNSImpl) wfAction.getAny());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to unmarshall hive action.", e);
+ }
+ }
+
+ protected void marshalHiveAction(ACTION wfAction,
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionjaxbElement) {
+ try {
+ DOMResult hiveActionDOM = new DOMResult();
+ Marshaller marshaller = HIVE_ACTION_JAXB_CONTEXT.createMarshaller();
+ marshaller.marshal(actionjaxbElement, hiveActionDOM);
+ wfAction.setAny(((Document) hiveActionDOM.getNode()).getDocumentElement());
+ } catch (JAXBException e) {
+ throw new RuntimeException("Unable to marshall hive action.", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 3f70557..1329733 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -73,9 +73,9 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
properties.put(inName + ".initial-instance", SchemaHelper.formatDateUTC(cluster.getValidity().getStart()));
properties.put(inName + ".done-flag", "notused");
- String locPath = FeedHelper.getLocation(feed, LocationType.DATA, clusterName).getPath().replace('$', '%');
- properties.put(inName + ".uri-template",
- new Path(locPath).toUri().getScheme() != null ? locPath : "${nameNode}" + locPath);
+ String locPath = FeedHelper.createStorage(clusterName, feed)
+ .getUriTemplate(LocationType.DATA).replace('$', '%');
+ properties.put(inName + ".uri-template", locPath);
properties.put(inName + ".start-instance", in.getStart());
properties.put(inName + ".end-instance", in.getEnd());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/main/resources/config/workflow/process-parent-workflow.xml
----------------------------------------------------------------------
diff --git a/process/src/main/resources/config/workflow/process-parent-workflow.xml b/process/src/main/resources/config/workflow/process-parent-workflow.xml
index eeffdd5..494bf20 100644
--- a/process/src/main/resources/config/workflow/process-parent-workflow.xml
+++ b/process/src/main/resources/config/workflow/process-parent-workflow.xml
@@ -38,6 +38,15 @@
<name>oozie.launcher.mapred.job.priority</name>
<value>${jobPriority}</value>
</property>
+ <!-- HCatalog jars -->
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>hcatalog</value>
+ </property>
</configuration>
<main-class>org.apache.falcon.latedata.LateDataHandler</main-class>
<arg>-out</arg>
@@ -46,6 +55,8 @@
<arg>${falconInPaths}</arg>
<arg>-falconInputFeeds</arg>
<arg>${falconInputFeeds}</arg>
+ <arg>-falconInputFeedStorageTypes</arg>
+ <arg>${falconInputFeedStorageTypes}</arg>
<capture-output/>
</java>
<ok to="user-workflow"/>
@@ -60,6 +71,9 @@
<case to="user-pig-job">
${userWorkflowEngine=="pig"}
</case>
+ <case to="user-hive-job">
+ ${userWorkflowEngine=="hive"}
+ </case>
<default to="user-oozie-workflow"/>
</switch>
</decision>
@@ -76,12 +90,40 @@
<name>oozie.launcher.mapred.job.priority</name>
<value>${jobPriority}</value>
</property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.pig</name>
+ <value>pig,hcatalog</value>
+ </property>
</configuration>
<script>#USER_WF_PATH#</script>
</pig>
<ok to="succeeded-post-processing"/>
<error to="failed-post-processing"/>
</action>
+ <action name="user-hive-job">
+ <hive xmlns="uri:oozie:hive-action:0.2">
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <job-xml>${wf:appPath()}/conf/falcon-hive-site.xml</job-xml>
+ <configuration>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ </configuration>
+ <script>#USER_WF_PATH#</script>
+ </hive>
+ <ok to="succeeded-post-processing"/>
+ <error to="failed-post-processing"/>
+ </action>
<action name='user-oozie-workflow'>
<sub-workflow>
<app-path>#USER_WF_PATH#</app-path>
@@ -90,7 +132,6 @@
<ok to="succeeded-post-processing"/>
<error to="failed-post-processing"/>
</action>
-
<action name='succeeded-post-processing'>
<java>
<job-tracker>${jobTracker}</job-tracker>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 9d13d79..b6f03e5 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -18,28 +18,14 @@
package org.apache.falcon.converter;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.List;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-
+import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
@@ -48,6 +34,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
@@ -56,17 +44,37 @@ import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.DECISION;
+import org.apache.falcon.oozie.workflow.PIG;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
-import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Test for the Falcon entities mapping into Oozie artifacts.
*/
@@ -89,6 +97,7 @@ public class OozieProcessMapperTest extends AbstractTestBase {
ConfigurationStore store = ConfigurationStore.get();
Cluster cluster = store.get(EntityType.CLUSTER, "corp");
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
+ ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
fs = new Path(hdfsUrl).getFileSystem(new Configuration());
fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
@@ -145,19 +154,20 @@ public class OozieProcessMapperTest extends AbstractTestBase {
ConfigurationStore store = ConfigurationStore.get();
Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
- assertEquals(SchemaHelper.formatDateUTC(feed.getClusters().getClusters().get(0).getValidity().getStart()),
- ds.getInitialInstance());
+
+ final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
+ assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
assertEquals(feed.getTimezone().getID(), ds.getTimezone());
assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
assertEquals("", ds.getDoneFlag());
- assertEquals(ds.getUriTemplate(), "${nameNode}" + FeedHelper.getLocation(feed, LocationType.DATA,
- feed.getClusters().getClusters().get(0).getName()).getPath());
+ assertEquals(ds.getUriTemplate(),
+ FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
+
+ HashMap<String, String> props = new HashMap<String, String>();
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- if (prop.getName().equals("mapred.job.priority")) {
- assertEquals(prop.getValue(), "LOW");
- break;
- }
+ props.put(prop.getName(), prop.getValue());
}
+ assertEquals(props.get("mapred.job.priority"), "LOW");
assertLibExtensions(coord);
}
@@ -194,16 +204,168 @@ public class OozieProcessMapperTest extends AbstractTestBase {
List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
- ACTION pigAction = (ACTION) decisionOrForkOrJoin.get(3);
- Assert.assertEquals("user-pig-job", pigAction.getName());
- Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getPig().getScript());
- Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getPig().getArchive());
+ ACTION pigActionNode = (ACTION) decisionOrForkOrJoin.get(3);
+ Assert.assertEquals("user-pig-job", pigActionNode.getName());
- ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(4);
+ final PIG pigAction = pigActionNode.getPig();
+ Assert.assertEquals("${nameNode}/apps/pig/id.pig", pigAction.getScript());
+ Assert.assertNotNull(pigAction.getPrepare());
+ Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
+ Assert.assertFalse(pigAction.getParam().isEmpty());
+ Assert.assertEquals(5, pigAction.getParam().size());
+ Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
+ Assert.assertTrue(pigAction.getFile().size() > 0);
+
+ ACTION oozieAction = (ACTION) decisionOrForkOrJoin.get(5);
Assert.assertEquals("user-oozie-workflow", oozieAction.getName());
Assert.assertEquals("#USER_WF_PATH#", oozieAction.getSubWorkflow().getAppPath());
}
+ @Test
+ public void testHiveProcessMapper() throws Exception {
+ URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+ Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+ resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+ Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+ resource = this.getClass().getResource("/config/process/hive-process.xml");
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+ OozieProcessMapper mapper = new OozieProcessMapper(process);
+ Path bundlePath = new Path("/tmp/seetharam", EntityUtil.getStagingPath(process));
+ mapper.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ // verify table props
+ Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (expected.containsKey(entry.getKey())) {
+ Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+ }
+ }
+
+ String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
+ WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
+ testParentWorkflow(process, parentWorkflow);
+
+ List<Object> decisionOrForkOrJoin = parentWorkflow.getDecisionOrForkOrJoin();
+
+ ACTION hiveNode = (ACTION) decisionOrForkOrJoin.get(4);
+ Assert.assertEquals("user-hive-job", hiveNode.getName());
+
+ JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = mapper.unMarshalHiveAction(hiveNode);
+ org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
+
+ Assert.assertEquals("${nameNode}/apps/hive/script.hql", hiveAction.getScript());
+ Assert.assertNull(hiveAction.getPrepare());
+ Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
+ Assert.assertFalse(hiveAction.getParam().isEmpty());
+ Assert.assertEquals(10, hiveAction.getParam().size());
+ }
+
+ @Test
+ public void testProcessMapperForTableStorage() throws Exception {
+ URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
+ Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, inFeed);
+
+ resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+ Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.FEED, outFeed);
+
+ resource = this.getClass().getResource("/config/process/pig-process-table.xml");
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
+ ConfigurationStore.get().publish(EntityType.PROCESS, process);
+
+ Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, "corp");
+ OozieProcessMapper mapper = new OozieProcessMapper(process);
+ Path bundlePath = new Path("/", EntityUtil.getStagingPath(process));
+ mapper.map(cluster, bundlePath);
+ assertTrue(fs.exists(bundlePath));
+
+ BUNDLEAPP bundle = getBundle(bundlePath);
+ assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
+ assertEquals(1, bundle.getCoordinator().size());
+ assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
+ bundle.getCoordinator().get(0).getName());
+ String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
+
+ COORDINATORAPP coord = getCoordinator(new Path(coordPath));
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+
+ // verify table props
+ Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process, cluster);
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (expected.containsKey(entry.getKey())) {
+ Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
+ }
+ }
+
+ // verify the late data params
+ Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getName());
+ Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
+ Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+
+ // verify the post processing params
+ Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getName());
+ Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
+ }
+
+ private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed, Process process,
+ Cluster cluster) throws FalconException {
+ Map<String, String> expected = new HashMap<String, String>();
+ for (Input input : process.getInputs().getInputs()) {
+ CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
+ propagateStorageProperties(input.getName(), storage, expected);
+ }
+
+ for (Output output : process.getOutputs().getOutputs()) {
+ CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
+ propagateStorageProperties(output.getName(), storage, expected);
+ }
+
+ return expected;
+ }
+
+ private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
+ Map<String, String> props) {
+ String prefix = "falcon_" + feedName;
+ props.put(prefix + "_storage_type", tableStorage.getType().name());
+ props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
+ props.put(prefix + "_database", tableStorage.getDatabase());
+ props.put(prefix + "_table", tableStorage.getTable());
+
+ if (prefix.equals("input")) {
+ props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
+ props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
+ props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
+ } else if (prefix.equals("output")) {
+ props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
private void assertLibExtensions(COORDINATORAPP coord) throws Exception {
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
JAXBContext jaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
@@ -261,8 +423,11 @@ public class OozieProcessMapperTest extends AbstractTestBase {
Assert.assertEquals("should-record", ((DECISION) decisionOrForkOrJoin.get(0)).getName());
Assert.assertEquals("recordsize", ((ACTION) decisionOrForkOrJoin.get(1)).getName());
Assert.assertEquals("user-workflow", ((DECISION) decisionOrForkOrJoin.get(2)).getName());
- Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
- Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+ Assert.assertEquals("user-pig-job", ((ACTION) decisionOrForkOrJoin.get(3)).getName());
+ Assert.assertEquals("user-hive-job", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
+ Assert.assertEquals("user-oozie-workflow", ((ACTION) decisionOrForkOrJoin.get(5)).getName());
+ Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
+ Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
}
private COORDINATORAPP getCoordinator(Path path) throws Exception {
@@ -277,16 +442,14 @@ public class OozieProcessMapperTest extends AbstractTestBase {
return jaxbBundle.getValue();
}
+ @SuppressWarnings("unchecked")
private WORKFLOWAPP getParentWorkflow(Path path) throws Exception {
String workflow = readFile(new Path(path, "workflow.xml"));
- Unmarshaller unmarshaller = JAXBContext.newInstance(WORKFLOWAPP.class).createUnmarshaller();
- SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
- Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-workflow-0.3.xsd"));
- unmarshaller.setSchema(schema);
- JAXBElement<WORKFLOWAPP> jaxbWorkflow = unmarshaller.unmarshal(
- new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())), WORKFLOWAPP.class);
- return jaxbWorkflow.getValue();
+ JAXBContext wfAppContext = JAXBContext.newInstance(WORKFLOWAPP.class);
+ Unmarshaller unmarshaller = wfAppContext.createUnmarshaller();
+ return ((JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+ new StreamSource(new ByteArrayInputStream(workflow.trim().getBytes())))).getValue();
}
private BUNDLEAPP getBundle(Path path) throws Exception {
@@ -312,8 +475,11 @@ public class OozieProcessMapperTest extends AbstractTestBase {
}
@Override
- @AfterClass
+ @AfterMethod
public void cleanup() throws Exception {
super.cleanup();
+ ConfigurationStore.get().remove(EntityType.PROCESS, "table-process");
+ ConfigurationStore.get().remove(EntityType.FEED, "clicks-raw-table");
+ ConfigurationStore.get().remove(EntityType.FEED, "clicks-summary-table");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/feed/hive-table-feed-out.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/feed/hive-table-feed-out.xml b/process/src/test/resources/config/feed/hive-table-feed-out.xml
new file mode 100644
index 0000000..bd93a01
--- /dev/null
+++ b/process/src/test/resources/config/feed/hive-table-feed-out.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="clicks summary table " name="clicks-summary-table" xmlns="uri:falcon:feed:0.1">
+ <groups>online,bi</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="corp" type="source" partition="*/${cluster.colo}">
+ <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+ <retention limit="hours(48)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ <cluster name="backupCluster" type="target">
+ <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+ <retention limit="hours(6)" action="archive"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ </clusters>
+
+ <table uri="catalog:default:clicks-summary#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/feed/hive-table-feed.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/feed/hive-table-feed.xml b/process/src/test/resources/config/feed/hive-table-feed.xml
new file mode 100644
index 0000000..66d0742
--- /dev/null
+++ b/process/src/test/resources/config/feed/hive-table-feed.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<feed description="clicks log table " name="clicks-raw-table" xmlns="uri:falcon:feed:0.1">
+ <groups>online,bi</groups>
+
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+ <late-arrival cut-off="hours(6)"/>
+
+ <clusters>
+ <cluster name="corp" type="source" partition="*/${cluster.colo}">
+ <validity start="2021-11-01T00:00Z" end="2021-12-31T00:00Z"/>
+ <retention limit="hours(48)" action="delete"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ <cluster name="backupCluster" type="target">
+ <validity start="2011-11-01T00:00Z" end="2011-12-31T00:00Z"/>
+ <retention limit="hours(6)" action="archive"/>
+ <!-- Limit can be in Time or Instances 100, Action ENUM DELETE,ARCHIVE -->
+ </cluster>
+ </clusters>
+
+ <table uri="catalog:default:clicks#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}" />
+
+ <ACL owner="testuser" group="group" permission="0x755"/>
+ <schema location="/schema/clicks" provider="protobuf"/>
+</feed>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/process/hive-process.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/hive-process.xml b/process/src/test/resources/config/process/hive-process.xml
new file mode 100644
index 0000000..4dac8e9
--- /dev/null
+++ b/process/src/test/resources/config/process/hive-process.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="hive-process" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what -->
+ <inputs>
+ <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+ </inputs>
+
+ <outputs>
+ <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+ </outputs>
+
+ <!-- how -->
+ <workflow engine="hive" path="/apps/hive/script.hql"/>
+
+ <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/process/pig-process-table.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/pig-process-table.xml b/process/src/test/resources/config/process/pig-process-table.xml
new file mode 100644
index 0000000..37aca10
--- /dev/null
+++ b/process/src/test/resources/config/process/pig-process-table.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="table-process" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="corp">
+ <validity start="2011-11-02T00:00Z" end="2011-12-30T00:00Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>hours(1)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what -->
+ <inputs>
+ <input name="input" feed="clicks-raw-table" start="yesterday(0,0)" end="yesterday(20,0)"/>
+ </inputs>
+
+ <outputs>
+ <output name="output" feed="clicks-summary-table" instance="today(0,0)"/>
+ </outputs>
+
+ <!-- how -->
+ <workflow engine="pig" path="/apps/pig/id.pig"/>
+
+ <retry policy="periodic" delay="minutes(10)" attempts="3"/>
+</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/process/src/test/resources/config/process/process-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/process/process-0.1.xml b/process/src/test/resources/config/process/process-0.1.xml
index 91d5e0f..975d1a4 100644
--- a/process/src/test/resources/config/process/process-0.1.xml
+++ b/process/src/test/resources/config/process/process-0.1.xml
@@ -40,6 +40,6 @@
<late-process policy="exp-backoff" delay="hours(1)">
<late-input input="impression" workflow-path="hdfs://impression/late/workflow"/>
- <late-input input="clicks" workflow-path="hdfs://clicks/late/workflow"/>
+ <late-input input="click" workflow-path="hdfs://clicks/late/workflow"/>
</late-process>
</process>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 200cb2f..07175e6 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -68,6 +68,11 @@
<dependencies>
<dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-common</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
----------------------------------------------------------------------
diff --git a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
index 5c51b5f..17227bf 100644
--- a/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
+++ b/replication/src/main/java/org/apache/falcon/replication/FeedReplicator.java
@@ -18,6 +18,7 @@
package org.apache.falcon.replication;
import org.apache.commons.cli.*;
+import org.apache.falcon.entity.Storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -48,8 +49,8 @@ public class FeedReplicator extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
-
- DistCpOptions options = getDistCpOptions(args);
+ CommandLine cmd = getCommand(args);
+ DistCpOptions options = getDistCpOptions(cmd);
Configuration conf = this.getConf();
// inject wf configs
@@ -60,10 +61,68 @@ public class FeedReplicator extends Configured implements Tool {
+ confPath.getFileSystem(conf).exists(confPath));
conf.addResource(confPath);
- DistCp distCp = new CustomReplicator(conf, options);
+ String falconFeedStorageType = cmd.getOptionValue("falconFeedStorageType").trim();
+ Storage.TYPE feedStorageType = Storage.TYPE.valueOf(falconFeedStorageType);
+
+ DistCp distCp = (feedStorageType == Storage.TYPE.FILESYSTEM)
+ ? new CustomReplicator(conf, options)
+ : new DistCp(conf, options);
LOG.info("Started DistCp");
distCp.execute();
+ if (feedStorageType == Storage.TYPE.FILESYSTEM) {
+ executePostProcessing(options); // this only applies for FileSystem Storage.
+ }
+
+ LOG.info("Completed DistCp");
+ return 0;
+ }
+
+ protected CommandLine getCommand(String[] args) throws ParseException {
+ Options options = new Options();
+ Option opt = new Option("maxMaps", true,
+ "max number of maps to use for this copy");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("sourcePaths", true,
+ "comma separtated list of source paths to be copied");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("targetPath", true, "target path");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ opt = new Option("falconFeedStorageType", true, "feed storage type");
+ opt.setRequired(true);
+ options.addOption(opt);
+
+ return new GnuParser().parse(options, args);
+ }
+
+ protected DistCpOptions getDistCpOptions(CommandLine cmd) {
+ String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
+ List<Path> srcPaths = getPaths(paths);
+ String trgPath = cmd.getOptionValue("targetPath").trim();
+
+ DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(trgPath));
+ distcpOptions.setSyncFolder(true);
+ distcpOptions.setBlocking(true);
+ distcpOptions.setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
+
+ return distcpOptions;
+ }
+
+ private List<Path> getPaths(String[] paths) {
+ List<Path> listPaths = new ArrayList<Path>();
+ for (String path : paths) {
+ listPaths.add(new Path(path));
+ }
+ return listPaths;
+ }
+
+ private void executePostProcessing(DistCpOptions options) throws IOException {
Path targetPath = options.getTargetPath();
FileSystem fs = targetPath.getFileSystem(getConf());
List<Path> inPaths = options.getSourcePaths();
@@ -87,8 +146,6 @@ public class FeedReplicator extends Configured implements Tool {
LOG.info("No files present in path: "
+ new Path(targetPath.toString() + "/" + fixedPath).toString());
}
- LOG.info("Completed DistCp");
- return 0;
}
private String getFixedPath(String relativePath) throws IOException {
@@ -113,44 +170,4 @@ public class FeedReplicator extends Configured implements Tool {
String result = resultBuffer.toString();
return result.substring(0, result.lastIndexOf('/'));
}
-
- public DistCpOptions getDistCpOptions(String[] args) throws ParseException {
- Options options = new Options();
- Option opt;
- opt = new Option("maxMaps", true,
- "max number of maps to use for this copy");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("sourcePaths", true,
- "comma separtated list of source paths to be copied");
- opt.setRequired(true);
- options.addOption(opt);
-
- opt = new Option("targetPath", true, "target path");
- opt.setRequired(true);
- options.addOption(opt);
-
- CommandLine cmd = new GnuParser().parse(options, args);
- String[] paths = cmd.getOptionValue("sourcePaths").trim().split(",");
- List<Path> srcPaths = getPaths(paths);
- String trgPath = cmd.getOptionValue("targetPath").trim();
-
- DistCpOptions distcpOptions = new DistCpOptions(srcPaths, new Path(
- trgPath));
- distcpOptions.setSyncFolder(true);
- distcpOptions.setBlocking(true);
- distcpOptions
- .setMaxMaps(Integer.valueOf(cmd.getOptionValue("maxMaps")));
-
- return distcpOptions;
- }
-
- private List<Path> getPaths(String[] paths) {
- List<Path> listPaths = new ArrayList<Path>();
- for (String path : paths) {
- listPaths.add(new Path(path));
- }
- return listPaths;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
----------------------------------------------------------------------
diff --git a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
index 67795b5..b8b39ad 100644
--- a/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
+++ b/replication/src/test/java/org/apache/falcon/replication/FeedReplicatorTest.java
@@ -17,6 +17,8 @@
*/
package org.apache.falcon.replication;
+import org.apache.commons.cli.CommandLine;
+import org.apache.falcon.entity.Storage;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCpOptions;
import org.testng.Assert;
@@ -38,11 +40,17 @@ public class FeedReplicatorTest {
* <arg>-sourcePaths</arg><arg>${distcpSourcePaths}</arg>
* <arg>-targetPath</arg><arg>${distcpTargetPaths}</arg>
*/
+ final String[] args = {
+ "true",
+ "-maxMaps", "5",
+ "-sourcePaths", "hdfs://localhost:8020/tmp/",
+ "-targetPath", "hdfs://localhost1:8020/tmp/",
+ "-falconFeedStorageType", Storage.TYPE.FILESYSTEM.name(),
+ };
+
FeedReplicator replicator = new FeedReplicator();
- DistCpOptions options = replicator.getDistCpOptions(new String[] {
- "true", "-maxMaps", "5", "-sourcePaths",
- "hdfs://localhost:8020/tmp/", "-targetPath",
- "hdfs://localhost1:8020/tmp/", });
+ CommandLine cmd = replicator.getCommand(args);
+ DistCpOptions options = replicator.getDistCpOptions(cmd);
List<Path> srcPaths = new ArrayList<Path>();
srcPaths.add(new Path("hdfs://localhost:8020/tmp/"));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index 5b758b8..4b35760 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -19,6 +19,12 @@
package org.apache.falcon.latedata;
import org.apache.commons.cli.*;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.CatalogPartition;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.Storage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -29,6 +35,7 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import java.io.*;
+import java.net.URISyntaxException;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -56,48 +63,44 @@ public class LateDataHandler extends Configured implements Tool {
Option opt = new Option("out", true, "Out file name");
opt.setRequired(true);
options.addOption(opt);
+
opt = new Option("paths", true,
"Comma separated path list, further separated by #");
opt.setRequired(true);
options.addOption(opt);
+
opt = new Option("falconInputFeeds", true,
"Input feed names, further separated by #");
opt.setRequired(true);
options.addOption(opt);
+ opt = new Option("falconInputFeedStorageTypes", true,
+ "Feed storage types corresponding to Input feed names, separated by #");
+ opt.setRequired(true);
+ options.addOption(opt);
+
return new GnuParser().parse(options, args);
}
@Override
public int run(String[] args) throws Exception {
-
CommandLine command = getCommand(args);
- Path file = new Path(command.getOptionValue("out"));
- Map<String, Long> map = new LinkedHashMap<String, Long>();
String pathStr = getOptionValue(command, "paths");
if (pathStr == null) {
return 0;
}
+ String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split("#");
String[] pathGroups = pathStr.split("#");
- String[] inputFeeds = getOptionValue(command, "falconInputFeeds").split(
- "#");
- for (int index = 0; index < pathGroups.length; index++) {
- long usage = 0;
- for (String pathElement : pathGroups[index].split(",")) {
- Path inPath = new Path(pathElement);
- usage += usage(inPath, getConf());
- }
- map.put(inputFeeds[index], usage);
- }
- LOG.info("MAP data: " + map);
+ String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#");
+
+ Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
+ LOG.info("MAP data: " + metrics);
+
+ Path file = new Path(command.getOptionValue("out"));
+ persistMetrics(metrics, file);
- OutputStream out = file.getFileSystem(getConf()).create(file);
- for (Map.Entry<String, Long> entry : map.entrySet()) {
- out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
- }
- out.close();
return 0;
}
@@ -109,7 +112,142 @@ public class LateDataHandler extends Configured implements Tool {
return value;
}
- public String detectChanges(Path file, Map<String, Long> map, Configuration conf)
+ private Map<String, Long> computeMetrics(String[] inputFeeds, String[] pathGroups,
+ String[] inputFeedStorageTypes)
+ throws IOException, FalconException, URISyntaxException {
+
+ Map<String, Long> computedMetrics = new LinkedHashMap<String, Long>();
+ for (int index = 0; index < pathGroups.length; index++) {
+ long storageMetric = computeStorageMetric(pathGroups[index], inputFeedStorageTypes[index], getConf());
+ computedMetrics.put(inputFeeds[index], storageMetric);
+ }
+
+ return computedMetrics;
+ }
+
+ private void persistMetrics(Map<String, Long> metrics, Path file) throws IOException {
+ OutputStream out = null;
+ try {
+ out = file.getFileSystem(getConf()).create(file);
+
+ for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+ out.write((entry.getKey() + "=" + entry.getValue() + "\n").getBytes());
+ }
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException ignore) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * This method computes the storage metrics for a given feed's instance or partition.
+ * It uses size on disk as the metric for File System Storage.
+ * It uses create time as the metric for Catalog Table Storage.
+ *
+ * The assumption is that if a partition has changed or reinstated, the underlying
+ * metric would change, either size or create time.
+ *
+ * @param feedUriTemplate URI for the feed storage, filesystem path or table uri
+ * @param feedStorageType feed storage type
+ * @param conf configuration
+ * @return computed metric
+ * @throws IOException
+ * @throws FalconException
+ * @throws URISyntaxException
+ */
+ public long computeStorageMetric(String feedUriTemplate, String feedStorageType, Configuration conf)
+ throws IOException, FalconException, URISyntaxException {
+
+ Storage.TYPE storageType = Storage.TYPE.valueOf(feedStorageType);
+
+ if (storageType == Storage.TYPE.FILESYSTEM) {
+ // usage on file system is the metric
+ return getFileSystemUsageMetric(feedUriTemplate, conf);
+ } else if (storageType == Storage.TYPE.TABLE) {
+ // todo: this should have been done in oozie mapper but el ${coord:dataIn('input')} returns hcat scheme
+ feedUriTemplate = feedUriTemplate.replace("hcat", "thrift");
+ // creation time of the given partition is the metric
+ return getTablePartitionCreateTimeMetric(feedUriTemplate);
+ }
+
+ throw new IllegalArgumentException("Unknown storage type: " + feedStorageType);
+ }
+
+ /**
+ * The storage metric for File System Storage is the size of content
+ * this feed's instance represented by the path uses on the file system.
+ *
+ * If this instance was reinstated, the assumption is that the size of
+ * this instance on disk would change.
+ *
+ * @param pathGroup path on file system
+ * @param conf configuration
+ * @return metric as the size of data on file system
+ * @throws IOException
+ */
+ private long getFileSystemUsageMetric(String pathGroup, Configuration conf)
+ throws IOException {
+ long usage = 0;
+ for (String pathElement : pathGroup.split(",")) {
+ Path inPath = new Path(pathElement);
+ usage += usage(inPath, conf);
+ }
+
+ return usage;
+ }
+
+ private long usage(Path inPath, Configuration conf) throws IOException {
+ FileSystem fs = inPath.getFileSystem(conf);
+ FileStatus[] fileStatuses = fs.globStatus(inPath);
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ return 0;
+ }
+ long totalSize = 0;
+ for (FileStatus fileStatus : fileStatuses) {
+ totalSize += fs.getContentSummary(fileStatus.getPath()).getLength();
+ }
+ return totalSize;
+ }
+
+ /**
+ * The storage metric for Table Storage is the create time of the given partition
+ * since there is API in Hive nor HCatalog to find if a partition has changed.
+ *
+ * If this partition was reinstated, the assumption is that the create time of
+ * this partition would change.
+ *
+ * @param feedUriTemplate catalog table uri
+ * @return metric as creation time of the given partition
+ * @throws IOException
+ * @throws URISyntaxException
+ * @throws FalconException
+ */
+ private long getTablePartitionCreateTimeMetric(String feedUriTemplate)
+ throws IOException, URISyntaxException, FalconException {
+
+ CatalogStorage storage = (CatalogStorage)
+ FeedHelper.createStorage(Storage.TYPE.TABLE.name(), feedUriTemplate);
+ CatalogPartition partition = CatalogServiceFactory.getCatalogService().getPartition(
+ storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), storage.getPartitions());
+ return partition == null ? 0 : partition.getCreateTime();
+ }
+
+ /**
+ * This method compares the recorded metrics persisted in file against
+ * the recently computed metrics and returns the list of feeds that has changed.
+ *
+ * @param file persisted metrics from the first run
+ * @param metrics newly computed metrics
+ * @param conf configuration
+ * @return list if feed names which has changed, empty string is none has changed
+ * @throws Exception
+ */
+ public String detectChanges(Path file, Map<String, Long> metrics, Configuration conf)
throws Exception {
StringBuilder buffer = new StringBuilder();
@@ -117,7 +255,7 @@ public class LateDataHandler extends Configured implements Tool {
file.getFileSystem(conf).open(file)));
String line;
try {
- Map<String, Long> recorded = new LinkedHashMap<String, Long>();
+ Map<String, Long> recordedMetrics = new LinkedHashMap<String, Long>();
while ((line = in.readLine()) != null) {
if (line.isEmpty()) {
continue;
@@ -125,17 +263,17 @@ public class LateDataHandler extends Configured implements Tool {
int index = line.indexOf('=');
String key = line.substring(0, index);
long size = Long.parseLong(line.substring(index + 1));
- recorded.put(key, size);
+ recordedMetrics.put(key, size);
}
- for (Map.Entry<String, Long> entry : map.entrySet()) {
- if (recorded.get(entry.getKey()) == null) {
+ for (Map.Entry<String, Long> entry : metrics.entrySet()) {
+ if (recordedMetrics.get(entry.getKey()) == null) {
LOG.info("No matching key " + entry.getKey());
continue;
}
- if (!recorded.get(entry.getKey()).equals(entry.getValue())) {
- LOG.info("Recorded size:" + recorded.get(entry.getKey()) + " is different from new size"
- + entry.getValue());
+ if (!recordedMetrics.get(entry.getKey()).equals(entry.getValue())) {
+ LOG.info("Recorded size:" + recordedMetrics.get(entry.getKey())
+ + " is different from new size" + entry.getValue());
buffer.append(entry.getKey()).append(',');
}
}
@@ -149,17 +287,4 @@ public class LateDataHandler extends Configured implements Tool {
in.close();
}
}
-
- public long usage(Path inPath, Configuration conf) throws IOException {
- FileSystem fs = inPath.getFileSystem(conf);
- FileStatus[] status = fs.globStatus(inPath);
- if (status == null || status.length == 0) {
- return 0;
- }
- long totalSize = 0;
- for (FileStatus statu : status) {
- totalSize += fs.getContentSummary(statu.getPath()).getLength();
- }
- return totalSize;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
index dcde876..b5ac121 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/LaterunEvent.java
@@ -22,14 +22,14 @@ package org.apache.falcon.rerun.event;
*/
public class LaterunEvent extends RerunEvent {
- //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
- public LaterunEvent(String clusterName, String wfId, long msgInsertTime,
- long delay, String entityType, String entityName, String instance,
- int runId) {
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public LaterunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
+ String entityType, String entityName, String instance, int runId) {
super(clusterName, wfId, msgInsertTime, delay, entityType, entityName,
instance, runId);
}
- //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/17f901a6/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
index 0dcc93d..baf4601 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/event/RerunEvent.java
@@ -45,7 +45,7 @@ public class RerunEvent implements Delayed {
protected String instance;
protected int runId;
- //SUSPEND CHECKSTYLE CHECK VisibilityModifierCheck
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public RerunEvent(String clusterName, String wfId, long msgInsertTime, long delay,
String entityType, String entityName, String instance, int runId) {
this.clusterName = clusterName;
@@ -57,7 +57,7 @@ public class RerunEvent implements Delayed {
this.runId = runId;
this.entityType = entityType;
}
- //RESUME CHECKSTYLE CHECK VisibilityModifierCheck
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
public String getClusterName() {
return clusterName;