You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/23 20:31:14 UTC

[1/2] git commit: FALCON-444 Logs dir for replication workflow is incorrect and jobs fail with permission issues. Contributed by Venkatesh Seetharam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 2129285dc -> 3d3bde558


FALCON-444 Logs dir for replication workflow is incorrect and jobs fail with permission issues. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/b7afe36f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/b7afe36f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/b7afe36f

Branch: refs/heads/master
Commit: b7afe36f494091e283702c28af5f42ef482226a0
Parents: 2129285
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 23 11:13:44 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 23 11:13:44 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../workflow/OozieFeedWorkflowBuilder.java      |  8 +-
 .../converter/OozieFeedWorkflowBuilderTest.java | 10 +++
 .../falcon/workflow/OozieWorkflowBuilder.java   | 93 +++++++++++++-------
 .../workflow/OozieProcessWorkflowBuilder.java   |  2 +-
 .../OozieProcessWorkflowBuilderTest.java        | 20 +++--
 .../apache/falcon/latedata/LateDataHandler.java |  2 +-
 7 files changed, 91 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fccc9b8..0e26c7d 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-444 Logs dir for replication workflow is incorrect and jobs fail
+   with permission issues (Venkatesh Seetharam)
+
    FALCON-443 Process with Hive workflow engine and filesystem input feeds,
    table output feed fails (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
index faaddac..6d36840 100644
--- a/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
+++ b/feed/src/main/java/org/apache/falcon/workflow/OozieFeedWorkflowBuilder.java
@@ -146,11 +146,11 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
             for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : entity.getClusters().getClusters()) {
                 if (feedCluster.getType() == ClusterType.SOURCE) {
                     String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, entity).toString();
-                    Path basePath = getCoordPath(bundlePath, coordName);
+                    Path coordPath = getCoordPath(bundlePath, coordName);
                     Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, feedCluster.getName());
 
                     // workflow is serialized to a specific dir
-                    Path sourceSpecificWfPath = new Path(basePath, srcCluster.getName());
+                    Path sourceSpecificWfPath = new Path(coordPath, srcCluster.getName());
 
                     // Different workflow for each source since hive credentials vary for each cluster
                     replicationMapper.createReplicationWorkflow(
@@ -211,7 +211,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
             createRetentionWorkflow(cluster, wfPath, wfName);
             retentionWorkflow.setAppPath(getStoragePath(wfPath.toString()));
 
-            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfPath, wfName);
+            Map<String, String> props = createCoordDefaultConfiguration(cluster, wfName);
             props.put("timeZone", entity.getTimezone().getID());
             props.put("frequency", entity.getFrequency().getTimeUnit().name());
 
@@ -539,7 +539,7 @@ public class OozieFeedWorkflowBuilder extends OozieWorkflowBuilder<Feed> {
             WORKFLOW replicationWF = new WORKFLOW();
 
             replicationWF.setAppPath(getStoragePath(wfPath.toString()));
-            Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfPath, wfName);
+            Map<String, String> props = createCoordDefaultConfiguration(trgCluster, wfName);
             props.put("srcClusterName", srcCluster.getName());
             props.put("srcClusterColo", srcCluster.getColo());
             if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
index 7cb055a..64b38ff 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -211,6 +211,7 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, feed));
 
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), feed.getName());
@@ -359,6 +360,7 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
         Assert.assertEquals(props.get("maxMaps"), "33");
         Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed));
     }
 
     public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP parentWorkflow) {
@@ -472,6 +474,7 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
         Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
         Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
 
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
@@ -575,6 +578,7 @@ public class OozieFeedWorkflowBuilderTest {
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), feed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(props.get("logDir"), getLogPath(srcCluster, feed));
 
         assertWorkflowRetries(coord);
     }
@@ -620,6 +624,7 @@ public class OozieFeedWorkflowBuilderTest {
         // verify the post processing params
         Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
         Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
+        Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
 
         assertWorkflowRetries(coord);
 
@@ -655,4 +660,9 @@ public class OozieFeedWorkflowBuilderTest {
             }
         }
     }
+
+    private String getLogPath(Cluster aCluster, Feed aFeed) {
+        Path logPath = EntityUtil.getLogPath(aCluster, aFeed);
+        return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index ad1af73..f5ff27a 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -147,20 +147,21 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
 
         for (COORDINATORAPP coordinatorapp : coordinators) {
             Path coordPath = getCoordPath(bundlePath, coordinatorapp.getName());
-            String coordXmlName = marshal(cluster, coordinatorapp, coordPath,
-                EntityUtil.getWorkflowNameSuffix(coordinatorapp.getName(), entity));
+            marshal(cluster, coordinatorapp, coordPath);
 
-            createLogsDir(cluster, coordPath); //create logs dir
             // copy falcon libs to the workflow dir
             copySharedLibs(cluster, coordinatorapp);
 
             // add the coordinator to the bundle
             COORDINATOR bundleCoord = new COORDINATOR();
             bundleCoord.setName(coordinatorapp.getName());
-            bundleCoord.setAppPath(getStoragePath(coordPath) + "/" + coordXmlName);
+            bundleCoord.setAppPath(getStoragePath(coordPath) + "/coordinator.xml");
             bundleApp.getCoordinator().add(bundleCoord);
         }
 
+        // create logs dir once since its at the root of the bundle path
+        createLogsDir(cluster);
+
         marshal(cluster, bundleApp, bundlePath); // write the bundle
         return true;
     }
@@ -237,28 +238,54 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         return conf;
     }
 
-    protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, Path coordPath, String coordName) {
+    protected Map<String, String> createCoordDefaultConfiguration(Cluster cluster, String coordName) {
         Map<String, String> props = new HashMap<String, String>();
         props.put(ARG.entityName.getPropName(), entity.getName());
+        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
         props.put(ARG.nominalTime.getPropName(), NOMINAL_TIME_EL);
         props.put(ARG.timeStamp.getPropName(), ACTUAL_TIME_EL);
+
+        addBrokerProperties(cluster, props);
+
+        props.put(OozieClient.EXTERNAL_ID,
+            new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
+                "${coord:nominalTime()}").getId());
+        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+
+        addLateDataProperties(props);
+
+        addClusterProperties(cluster, props);
+
+        props.put(MR_QUEUE_NAME, "default");
+        props.put(MR_JOB_PRIORITY, "NORMAL");
+
+        //props in entity override the set props.
+        props.putAll(getEntityProperties());
+
+        // this cannot be overridden
+        props.put("logDir", getStoragePath(EntityUtil.getLogPath(cluster, entity)));
+
+        return props;
+    }
+
+    private void addBrokerProperties(Cluster cluster, Map<String, String> props) {
         props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
         props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
-        String falconBrokerUrl = StartupProperties.get().getProperty(ARG.brokerUrl.getPropName(),
-            "tcp://localhost:61616?daemon=true");
+
+        String falconBrokerUrl = StartupProperties.get().getProperty(
+                ARG.brokerUrl.getPropName(), "tcp://localhost:61616?daemon=true");
         props.put(ARG.brokerUrl.getPropName(), falconBrokerUrl);
-        String falconBrokerImplClass = StartupProperties.get().getProperty(ARG.brokerImplClass.getPropName(),
-            ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+
+        String falconBrokerImplClass = StartupProperties.get().getProperty(
+                ARG.brokerImplClass.getPropName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
         props.put(ARG.brokerImplClass.getPropName(), falconBrokerImplClass);
-        String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
-            DEFAULT_BROKER_MSG_TTL.toString());
+
+        String jmsMessageTTL = StartupProperties.get().getProperty(
+                "broker.ttlInMins", DEFAULT_BROKER_MSG_TTL.toString());
         props.put(ARG.brokerTTL.getPropName(), jmsMessageTTL);
-        props.put(ARG.entityType.getPropName(), entity.getEntityType().name());
-        props.put("logDir", getStoragePath(new Path(coordPath, "../../logs")));
-        props.put(OozieClient.EXTERNAL_ID,
-            new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
-                "${coord:nominalTime()}").getId());
-        props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+    }
+
+    private void addLateDataProperties(Map<String, String> props) {
         try {
             if (EntityUtil.getLateProcess(entity) == null
                 || EntityUtil.getLateProcess(entity).getLateInputs() == null
@@ -271,20 +298,16 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
             LOG.error("Unable to get Late Process for entity: {}", entity, e);
             throw new FalconRuntimException(e);
         }
-        props.put("entityName", entity.getName());
-        props.put("entityType", entity.getEntityType().name().toLowerCase());
+    }
+
+    private void addClusterProperties(Cluster cluster, Map<String, String> props) {
         props.put(ARG.cluster.getPropName(), cluster.getName());
+
         if (cluster.getProperties() != null) {
             for (Property prop : cluster.getProperties().getProperties()) {
                 props.put(prop.getName(), prop.getValue());
             }
         }
-
-        props.put(MR_QUEUE_NAME, "default");
-        props.put(MR_JOB_PRIORITY, "NORMAL");
-        //props in entity override the set props.
-        props.putAll(getEntityProperties());
-        return props;
     }
 
     protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
@@ -322,27 +345,29 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
         }
     }
 
-    private void createLogsDir(Cluster cluster, Path coordPath) throws FalconException {
+    private void createLogsDir(Cluster cluster) throws FalconException {
+        Path logsDir = EntityUtil.getLogPath(cluster, entity);
         try {
             FileSystem fs = HadoopClientFactory.get().createFileSystem(
-                coordPath.toUri(), ClusterHelper.getConfiguration(cluster));
-            Path logsDir = new Path(coordPath, "../../logs");
+                    ClusterHelper.getConfiguration(cluster));
+            if (fs.exists(logsDir)) {
+                return;
+            }
+
             fs.mkdirs(logsDir);
 
             // logs are copied with in oozie as the user in Post Processing and hence 777 permissions
             FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
             fs.setPermission(logsDir, permission);
         } catch (Exception e) {
-            throw new FalconException("Unable to create temp dir in " + coordPath, e);
+            throw new FalconException("Unable to create logs dir at: " + logsDir, e);
         }
     }
 
-    protected String marshal(Cluster cluster, COORDINATORAPP coord, Path outPath,
-                             String name) throws FalconException {
-        name = (StringUtils.isEmpty(name) ? "coordinator" : name) + ".xml";
+    protected void marshal(Cluster cluster, COORDINATORAPP coord,
+                           Path outPath) throws FalconException {
         marshal(cluster, new ObjectFactory().createCoordinatorApp(coord),
-                OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, name));
-        return name;
+                OozieUtils.COORD_JAXB_CONTEXT, new Path(outPath, "coordinator.xml"));
     }
 
     protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
index 5089779..70aeebd 100644
--- a/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
+++ b/process/src/main/java/org/apache/falcon/workflow/OozieProcessWorkflowBuilder.java
@@ -280,7 +280,7 @@ public class OozieProcessWorkflowBuilder extends OozieWorkflowBuilder<Process> {
         coord.setControls(controls);
 
         // Configuration
-        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordPath, coordName);
+        Map<String, String> props = createCoordDefaultConfiguration(cluster, coordName);
 
         initializeInputPaths(cluster, entity, coord, props); // inputs
         initializeOutputPaths(cluster, entity, coord, props);  // outputs

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 5f0efe7..8cfa9fc 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -186,10 +186,16 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
             props.put(prop.getName(), prop.getValue());
         }
         assertEquals(props.get("mapred.job.priority"), "LOW");
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         assertLibExtensions(coord);
     }
 
+    private String getLogPath(Process process) {
+        Path logPath = EntityUtil.getLogPath(cluster, process);
+        return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+    }
+
     @Test
     public void testBundle() throws Exception {
         String path = StartupProperties.get().getProperty("system.lib.location");
@@ -290,6 +296,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
             }
         }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -321,10 +328,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
     public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception {
         StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
 
-        URL resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
-        Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-
-        resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
+        URL resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
         Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.FEED, outFeed);
 
@@ -350,6 +354,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             props.put(prop.getName(), prop.getValue());
         }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -385,9 +390,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.FEED, inFeed);
 
-        resource = this.getClass().getResource("/config/feed/feed-0.1.xml");
-        Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
-
         resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml");
         Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
         ConfigurationStore.get().publish(EntityType.PROCESS, process);
@@ -410,6 +412,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             props.put(prop.getName(), prop.getValue());
         }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -463,6 +466,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             props.put(prop.getName(), prop.getValue());
         }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
         WORKFLOWAPP parentWorkflow = getParentWorkflow(new Path(wfPath));
@@ -566,6 +570,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
             }
         }
+        Assert.assertEquals(props.get("logDir"), getLogPath(process));
 
         // verify the late data params
         Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
@@ -773,6 +778,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
         for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
             props.put(prop.getName(), prop.getValue());
         }
+        Assert.assertEquals(props.get("logDir"), getLogPath(processEntity));
 
         String[] expected = {
             EntityInstanceMessage.ARG.feedNames.getPropName(),

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/b7afe36f/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
index bcbdb08..75de12e 100644
--- a/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/latedata/LateDataHandler.java
@@ -99,9 +99,9 @@ public class LateDataHandler extends Configured implements Tool {
         String[] inputFeedStorageTypes = getOptionValue(command, "falconInputFeedStorageTypes").split("#");
 
         Map<String, Long> metrics = computeMetrics(inputFeeds, pathGroups, inputFeedStorageTypes);
-        LOG.info("MAP data: {}", metrics);
 
         Path file = new Path(command.getOptionValue("out"));
+        LOG.info("Persisting late data metrics: {} to file: {}", metrics, file);
         persistMetrics(metrics, file);
 
         return 0;


[2/2] git commit: FALCON-445 Propagate hive credentials defined in cluster entity to hive-site.xml. Contributed by Sowmya Ramesh

Posted by ve...@apache.org.
FALCON-445 Propagate hive credentials defined in cluster entity to hive-site.xml. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3d3bde55
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3d3bde55
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3d3bde55

Branch: refs/heads/master
Commit: 3d3bde558027af1f6ceb914b396cf6a41682625e
Parents: b7afe36
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 23 11:31:03 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 23 11:31:03 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 ++++-
 .../org/apache/falcon/entity/ClusterHelper.java | 20 ++++++++++++++++++++
 .../falcon/workflow/OozieWorkflowBuilder.java   |  6 ++++++
 .../OozieProcessWorkflowBuilderTest.java        |  3 ++-
 .../resources/config/cluster/cluster-0.1.xml    |  1 +
 5 files changed, 33 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0e26c7d..e06fc80 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,7 +7,10 @@ Trunk (Unreleased)
   NEW FEATURES
 
   IMPROVEMENTS
-   FALCON-133 Upgrade to slf4j 1.7.5 and use SLF4J logger. (Jean-Baptiste Onofré 
+   FALCON-445 Propagate hive credentials defined in cluster entity to
+   hive-site.xml (Sowmya Ramesh via Venkatesh Seetharam)
+
+   FALCON-133 Upgrade to slf4j 1.7.5 and use SLF4J logger. (Jean-Baptiste Onofré
    via Shwetha GS)
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index 3bf9d95..5284d68 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Helper to get end points relating to the cluster.
  */
@@ -135,4 +139,20 @@ public final class ClusterHelper {
         }
         return null;
     }
+
+    public static Map<String, String> geHiveProperties(Cluster cluster) {
+        if (cluster.getProperties() != null) {
+            List<Property> properties = cluster.getProperties().getProperties();
+            if (properties != null && !properties.isEmpty()) {
+                Map<String, String> hiveProperties = new HashMap<String, String>();
+                for (Property prop : properties) {
+                    if (prop.getName().startsWith("hive.")) {
+                        hiveProperties.put(prop.getName(), prop.getValue());
+                    }
+                }
+                return hiveProperties;
+            }
+        }
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index f5ff27a..7d84938 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -483,6 +483,12 @@ public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBui
                     "Registry interface is not defined in cluster: " + cluster.getName());
         }
 
+        // Propagate the hive properties from cluster entity
+        Map<String, String> hiveProperties = ClusterHelper.geHiveProperties(cluster);
+        if (hiveProperties != null && !hiveProperties.isEmpty()) {
+            hiveCredentials.putAll(hiveProperties);
+        }
+
         hiveCredentials.put(METASTOREURIS, metaStoreUrl);
         hiveCredentials.put("hive.metastore.execute.setugi", "true");
         hiveCredentials.put("hcatNode", metaStoreUrl.replace("thrift", "hcat"));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
index 8cfa9fc..2522ca3 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessWorkflowBuilderTest.java
@@ -289,8 +289,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
             props.put(prop.getName(), prop.getValue());
         }
 
-        // verify table props
+        // verify table and hive props
         Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
+        expected.putAll(ClusterHelper.geHiveProperties(cluster));
         for (Map.Entry<String, String> entry : props.entrySet()) {
             if (expected.containsKey(entry.getKey())) {
                 Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3d3bde55/process/src/test/resources/config/cluster/cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/process/src/test/resources/config/cluster/cluster-0.1.xml b/process/src/test/resources/config/cluster/cluster-0.1.xml
index 3780b3f..032cc77 100644
--- a/process/src/test/resources/config/cluster/cluster-0.1.xml
+++ b/process/src/test/resources/config/cluster/cluster-0.1.xml
@@ -39,5 +39,6 @@
     <properties>
         <property name="field1" value="value1"/>
         <property name="field2" value="value2"/>
+        <property name="hive.metastore.client.socket.timeout" value="20"/>
     </properties>
 </cluster>