You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/23 20:31:14 UTC
[1/2] git commit: FALCON-444 Logs dir for replication workflow is
incorrect and jobs fail with permission issues. Contributed by Venkatesh
Seetharam
Repository: incubator-falcon
Updated Branches:
refs/heads/master 2129285dc -> 3d3bde558
FALCON-444 Logs dir for replication workflow is incorrect and jobs fail with permission issues. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b7afe36f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b7afe36f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b7afe36f
Branch: refs/heads/master
Commit: b7afe36f494091e283702c28af5f42ef482226a0
Parents: 2129285
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 23 11:13:44 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 23 11:13:44 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../workflow/OozieFeedWorkflowBuilder.java | 8 +-
.../converter/OozieFeedWorkflowBuilderTest.java | 10 +++
.../falcon/workflow/OozieWorkflowBuilder.java | 93 +++++++++++++-------
.../workflow/OozieProcessWorkflowBuilder.java | 2 +-
.../OozieProcessWorkflowBuilderTest.java | 20 +++--
.../apache/falcon/latedata/LateDataHandler.java | 2 +-
7 files changed, 91 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fccc9b8..0e26c7d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-444 Logs dir for replication workflow is incorrect and jobs fail
+ with permission issues (Venkatesh Seetharam)
+
FALCON-443 Process with Hive workflow engine and filesystem input feeds,
table output feed fails (Sowmya Ramesh via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index faaddac..6d36840 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -146,11 +146,11 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
if (feedCluster.getType() == ClusterType.SOURCE) {
String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
- Path basePath = getCoordPath(bundlePath, coordName);
+ Path coordPath = getCoordPath(bundlePath, coordName);
Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName());
// workflow is serialized to a specific dir
- Path sourceSpecificWfPath = new Path(basePath, srcCluster.getName());
+ Path sourceSpecificWfPath = new Path(coordPath, srcCluster.getName());
// Different workflow for each source since hive credentials vary for each cluster
replicationMapper.createReplicationWorkflow(
@@ -211,7 +211,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
createRetentionWorkflow(cluster, wfPath, wfName);
retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
- Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+ Map<String, String> props = createCoordDefaultConfiguration(cluster, wfName);
props.put("timeZone", entity.getTimezone().getID());
props.put("frequency", entity.getFrequency().getTimeUnit().name());
@@ -539,7 +539,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
WORKFLOW replicationWF = new WORKFLOW();
replicationWF.setAppPath(getStoragePath(wfPath.toString()));
- Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+ Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfName);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
index 7cb055a..64b38ff 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -211,6 +211,7 @@ public class OozieFeedWorkflowBuilderTest {
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+ Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, feed));
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), feed.getName());
@@ -359,6 +360,7 @@ public class OozieFeedWorkflowBuilderTest {
Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
Assert.assertEquals(props.get("maxMaps"), "33");
Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
+ Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed));
}
public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
@@ -472,6 +474,7 @@ public class OozieFeedWorkflowBuilderTest {
Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+ Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
@@ -575,6 +578,7 @@ public class OozieFeedWorkflowBuilderTest {
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), feed.getName());
Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+ Assert.assertEquals(props.get("logDir"), getLogPath(srcCluster, feed));
assertWorkflowRetries(coord);
}
@@ -620,6 +624,7 @@ public class OozieFeedWorkflowBuilderTest {
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+ Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
assertWorkflowRetries(coord);
@@ -655,4 +660,9 @@ public class OozieFeedWorkflowBuilderTest {
}
}
}
+
+ private String getLogPath(Cluster aCluster, Feed aFeed) {
+ Path logPath = EntityUtil.getLogPath(aCluster, aFeed);
+ return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index ad1af73..f5ff27a 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -147,20 +147,21 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
for (COORDINATORAPP coordinatorapp : coordinators) {
Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
- String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
- EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
+ marshal(cluster, coordinatorapp, coordPath);
- createLogsDir(cluster, coordPath); //create logs dir
// copy falcon libs to the workflow dir
copySharedLibs(cluster, coordinatorapp);
// add the coordinator to the bundle
COORDINATOR bundleCoord = new COORDINATOR();
bundleCoord.setName(coordinatorapp.getName());
- bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
+ bundleCoord.setAppPath(getStoragePath(coordPath) + "/coordinator.xml");
bundleApp.getCoordinator().add(bundleCoord);
}
+ // create logs dir once since its at the root of the bundle path
+ createLogsDir(cluster);
+
marshal(cluster, bundleApp, bundlePath); // write the bundle
return true;
}
@@ -237,28 +238,54 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
return conf;
}
- protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
+ protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, String coordName) {
Map<String, String> props = new HashMap<String, String>();
props.put(ARG.entityName.getPropName(), entity.getName());
+ props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+
+ addBrokerProperties(cluster, props);
+
+ props.put(OozieClient.EXTERNAL_ID,
+ new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+ "${coord:nominalTime()}").getId());
+ props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+
+ addLateDataProperties(props);
+
+ addClusterProperties(cluster, props);
+
+ props.put(MR_QUEUE_NAME, "default");
+ props.put(MR_JOB_PRIORITY, "NORMAL");
+
+ //props in entity override the set props.
+ props.putAll(getEntityProperties());
+
+ // this cannot be overridden
+ props.put("logDir", getStoragePath(EntityUtil.getLogPath(cluster, entity)));
+
+ return props;
+ }
+
+ private void addBrokerProperties(Cluster cluster, Map<String, String> props) {
props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
- String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
- "tcp://localhost:61616?daemon=true");
+
+ String falconBrokerUrl = StartupProperties.get().getProperty(
+ ARG.brokerUrl.getPropName(), "tcp://localhost:61616?daemon=true");
props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
- String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
- ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+
+ String falconBrokerImplClass = StartupProperties.get().getProperty(
+ ARG.brokerImplClass.getPropName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
- String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
- DEFAULT_BROKER_MSG_TTL.toString());
+
+ String jmsMessageTTL = StartupProperties.get().getProperty(
+ "broker.ttlInMins", DEFAULT_BROKER_MSG_TTL.toString());
props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
- props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
- props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
- props.put(OozieClient.EXTERNAL_ID,
- new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
- "${coord:nominalTime()}").getId());
- props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+ }
+
+ private void addLateDataProperties(Map<String, String> props) {
try {
if (EntityUtil.getLateProcess(entity) == null
|| EntityUtil.getLateProcess(entity).getLateInputs() == null
@@ -271,20 +298,16 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
LOG.error("Unable to get Late Process for entity: {}", entity, e);
throw new FalconRuntimException(e);
}
- props.put("entityName", entity.getName());
- props.put("entityType", entity.getEntityType().name().toLowerCase());
+ }
+
+ private void addClusterProperties(Cluster cluster, Map<String, String> props) {
props.put(ARG.cluster.getPropName(), cluster.getName());
+
if (cluster.getProperties() != null) {
for (Property prop : cluster.getProperties().getProperties()) {
props.put(prop.getName(), prop.getValue());
}
}
-
- props.put(MR_QUEUE_NAME, "default");
- props.put(MR_JOB_PRIORITY, "NORMAL");
- //props in entity override the set props.
- props.putAll(getEntityProperties());
- return props;
}
protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
@@ -322,27 +345,29 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
}
}
- private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
+ private void createLogsDir(Cluster cluster) throws FalconException {
+ Path logsDir = EntityUtil.getLogPath(cluster, entity);
try {
FileSystem fs = HadoopClientFactory.get().createFileSystem(
- coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
- Path logsDir = new Path(coordPath, "../../logs");
+ ClusterHelper.getConfiguration(cluster));
+ if (fs.exists(logsDir)) {
+ return;
+ }
+
fs.mkdirs(logsDir);
// logs are copied with in oozie as the user in Post Processing and hence 777 permissions
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
fs.setPermission(logsDir, permission);
} catch (Exception e) {
- throw new FalconException("Unable to create temp dir in " + coordPath, e);
+ throw new FalconException("Unable to create logs dir at: " + logsDir, e);
}
}
- protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath,
- String name) throws FalconException {
- name = (StringUtils.isEmpty(name) ? "coordinator" : name) + ".xml";
+ protected void marshal(Cluster cluster, COORDINATORAPP coord,
+ Path outPath) throws FalconException {
marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
- OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, name));
- return name;
+ OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
}
protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 5089779..70aeebd 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -280,7 +280,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
coord.setControls(controls);
// Configuration
- Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
+ Map<String, String> props = createCoordDefaultConfiguration(cluster, coordName);
initializeInputPaths(cluster, entity, coord, props); // inputs
initializeOutputPaths(cluster, entity, coord, props); // outputs
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 5f0efe7..8cfa9fc 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -186,10 +186,16 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
props.put(prop.getName(), prop.getValue());
}
assertEquals(props.get("mapred.job.priority"), "LOW");
+ Assert.assertEquals(props.get("logDir"), getLogPath(process));
assertLibExtensions(coord);
}
+ private String getLogPath(Process process) {
+ Path logPath = EntityUtil.getLogPath(cluster, process);
+ return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+ }
+
@Test
public void testBundle() throws Exception {
String path = StartupProperties.get().getProperty("system.lib.location");
@@ -290,6 +296,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
+ Assert.assertEquals(props.get("logDir"), getLogPath(process));
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -321,10 +328,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
- URL resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
- Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-
- resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+ URL resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, outFeed);
@@ -350,6 +354,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
props.put(prop.getName(), prop.getValue());
}
+ Assert.assertEquals(props.get("logDir"), getLogPath(process));
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -385,9 +390,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, inFeed);
- resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
- Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-
resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
@@ -410,6 +412,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
props.put(prop.getName(), prop.getValue());
}
+ Assert.assertEquals(props.get("logDir"), getLogPath(process));
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -463,6 +466,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
props.put(prop.getName(), prop.getValue());
}
+ Assert.assertEquals(props.get("logDir"), getLogPath(process));
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -566,6 +570,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
+ Assert.assertEquals(props.get("logDir"), getLogPath(process));
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
@@ -773,6 +778,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
props.put(prop.getName(), prop.getValue());
}
+ Assert.assertEquals(props.get("logDir"), getLogPath(processEntity));
String[] expected = {
EntityInstanceMessage.ARG.feedNames.getPropName(),
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index bcbdb08..75de12e 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -99,9 +99,9 @@ public class LateDataHandler extends Configured implements Tool {
String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#");
Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
- LOG.info("MAP data: {}", metrics);
Path file = new Path(command.getOptionValue("out"));
+ LOG.info("Persisting late data metrics: {} to file: {}", metrics, file);
persistMetrics(metrics, file);
return 0;
[2/2] git commit: FALCON-445 Propagate hive credentials defined in
cluster entity to hive-site.xml. Contributed by Sowmya Ramesh
Posted by ve...@apache.org.
FALCON-445 Propagate hive credentials defined in cluster entity to hive-site.xml. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3d3bde55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3d3bde55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3d3bde55
Branch: refs/heads/master
Commit: 3d3bde558027af1f6ceb914b396cf6a41682625e
Parents: b7afe36
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 23 11:31:03 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 23 11:31:03 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 5 ++++-
.../org/apache/falcon/entity/ClusterHelper.java | 20 ++++++++++++++++++++
.../falcon/workflow/OozieWorkflowBuilder.java | 6 ++++++
.../OozieProcessWorkflowBuilderTest.java | 3 ++-
.../resources/config/cluster/cluster-0.1.xml | 1 +
5 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e26c7d..e06fc80 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,7 +7,10 @@ Trunk (Unreleased)
NEW FEATURES
IMPROVEMENTS
- FALCON-133 Upgrade to slf4j 1.7.5 and use SLF4J logger. (Jean-Baptiste Onofré
+ FALCON-445 Propagate hive credentials defined in cluster entity to
+ hive-site.xml (Sowmya Ramesh via Venkatesh Seetharam)
+
+ FALCON-133 Upgrade to slf4j 1.7.5 and use SLF4J logger. (Jean-Baptiste Onofré
via Shwetha GS)
OPTIMIZATIONS
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 3bf9d95..5284d68 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
/**
* Helper to get end points relating to the cluster.
*/
@@ -135,4 +139,20 @@ public final class ClusterHelper {
}
return null;
}
+
+ public static Map<String, String> geHiveProperties(Cluster cluster) {
+ if (cluster.getProperties() != null) {
+ List<Property> properties = cluster.getProperties().getProperties();
+ if (properties != null && !properties.isEmpty()) {
+ Map<String, String> hiveProperties = new HashMap<String, String>();
+ for (Property prop : properties) {
+ if (prop.getName().startsWith("hive.")) {
+ hiveProperties.put(prop.getName(), prop.getValue());
+ }
+ }
+ return hiveProperties;
+ }
+ }
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index f5ff27a..7d84938 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -483,6 +483,12 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
"Registry interface is not defined in cluster: " + cluster.getName());
}
+ // Propagate the hive properties from cluster entity
+ Map<String, String> hiveProperties = ClusterHelper.geHiveProperties(cluster);
+ if (hiveProperties != null && !hiveProperties.isEmpty()) {
+ hiveCredentials.putAll(hiveProperties);
+ }
+
hiveCredentials.put(METASTOREURIS, metaStoreUrl);
hiveCredentials.put("hive.metastore.execute.setugi", "true");
hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat"));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 8cfa9fc..2522ca3 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -289,8 +289,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
props.put(prop.getName(), prop.getValue());
}
- // verify table props
+ // verify table and hive props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
+ expected.putAll(ClusterHelper.geHiveProperties(cluster));
for (Map.Entry<String, String> entry : props.entrySet()) {
if (expected.containsKey(entry.getKey())) {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/process/src/test/resources/config/cluster/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/cluster/cluster-0.1.xml b/process/src/test/resources/config/cluster/cluster-0.1.xml
index 3780b3f..032cc77 100644
--- a/process/src/test/resources/config/cluster/cluster-0.1.xml
+++ b/process/src/test/resources/config/cluster/cluster-0.1.xml
@@ -39,5 +39,6 @@
<properties>
<property name="field1" value="value1"/>
<property name="field2" value="value2"/>
+ <property name="hive.metastore.client.socket.timeout" value="20"/>
</properties>
</cluster>