You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/31 06:40:34 UTC

[1/2] git commit: FALCON-446 Hive Replications fail because of permissions issue. Contributed by Venkatesh Seetharam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master 3d3bde558 -> 91704c6ae


FALCON-446 Hive Replications fail because of permissions issue. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5ac7544b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5ac7544b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5ac7544b

Branch: refs/heads/master
Commit: 5ac7544bcb0d9e578d5ddc78de9382e5057220af
Parents: 3d3bde5
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 30 21:38:39 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 30 21:38:39 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../falcon/cleanup/AbstractCleanupHandler.java  | 17 ++++--
 .../falcon/cleanup/FeedCleanupHandler.java      | 60 +++-----------------
 .../falcon/cleanup/ProcessCleanupHandler.java   | 23 +++-----
 .../org/apache/falcon/entity/FeedHelper.java    |  5 +-
 .../config/workflow/replication-workflow.xml    |  8 +++
 .../converter/OozieFeedWorkflowBuilderTest.java |  5 +-
 7 files changed, 44 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e06fc80..d851109 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-446 Hive Replications fail because of permissions issue
+   (Venkatesh Seetharam)
+
    FALCON-444 Logs dir for replication workflow is incorrect and jobs fail
    with permission issues (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index ab85ae0..846d48a 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -20,6 +20,7 @@ package org.apache.falcon.cleanup;
 import org.apache.commons.el.ExpressionEvaluatorImpl;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.Frequency;
@@ -69,21 +70,25 @@ public abstract class AbstractCleanupHandler {
                 "log.cleanup.frequency." + timeunit + ".retention", "days(1)");
     }
 
-    protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
-        throws FalconException {
-
-        String stagingPath = ClusterHelper.getLocation(cluster, "staging");
-        Path logPath = getLogPath(entity, stagingPath);
+    protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+                                      Entity entity) throws FalconException {
         FileSystem fs = getFileSystem(cluster);
         FileStatus[] paths;
         try {
+            Path logPath = getLogPath(cluster, entity);
             paths = fs.globStatus(logPath);
         } catch (IOException e) {
             throw new FalconException(e);
         }
+
         return paths;
     }
 
+    private Path getLogPath(Cluster cluster, Entity entity) {
+        // logsPath = base log path + relative path
+        return new Path(EntityUtil.getLogPath(cluster, entity), getRelativeLogPath());
+    }
+
     protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
         throws FalconException {
 
@@ -139,7 +144,7 @@ public abstract class AbstractCleanupHandler {
 
     public abstract void cleanup() throws FalconException;
 
-    protected abstract Path getLogPath(Entity entity, String stagingPath);
+    protected abstract String getRelativeLogPath();
 
     protected String getCurrentColo() {
         return StartupProperties.get().getProperty("current.colo", "default");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 452ab02..5d4ecd9 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -18,19 +18,10 @@
 package org.apache.falcon.cleanup;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 
-import java.io.IOException;
 import java.util.Collection;
 
 /**
@@ -42,19 +33,15 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
     public void cleanup() throws FalconException {
         Collection<String> feeds = STORE.getEntities(EntityType.FEED);
         for (String feedName : feeds) {
-            Feed feed;
-            feed = STORE.get(EntityType.FEED, feedName);
-            long retention = getRetention(feed, feed.getFrequency()
-                    .getTimeUnit());
-            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed
-                    .getClusters().getClusters()) {
-                Cluster currentCluster = STORE.get(EntityType.CLUSTER,
-                        cluster.getName());
+            Feed feed = STORE.get(EntityType.FEED, feedName);
+            long retention = getRetention(feed, feed.getFrequency().getTimeUnit());
+
+            for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+                Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
                 if (currentCluster.getColo().equals(getCurrentColo())) {
-                    LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}", feedName,
-                            cluster.getName(), retention);
+                    LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}",
+                            feedName, cluster.getName(), retention);
                     delete(currentCluster, feed, retention);
-                    deleteStagedData(currentCluster, feed, retention);
                 } else {
                     LOG.info("Ignoring cleanup for feed: {} in cluster: {} as this does not belong to current colo",
                             feedName, cluster.getName());
@@ -64,37 +51,8 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
         }
     }
 
-    /**
-     * Delete the staging area used for replicating tables.
-     *
-     * @param cluster cluster hosting the staged data
-     * @param feed feed entity
-     * @param retention retention limit
-     * @throws FalconException
-     */
-    private void deleteStagedData(Cluster cluster, Feed feed, long retention)
-        throws FalconException {
-        Storage storage = FeedHelper.createStorage(cluster, feed);
-        if (storage.getType() == Storage.TYPE.FILESYSTEM) {  // FS does NOT use staging dirs
-            return;
-        }
-
-        final CatalogStorage tableStorage = (CatalogStorage) storage;
-        String stagingDir = FeedHelper.getStagingDir(cluster, feed, tableStorage, Tag.REPLICATION);
-        //stagingDir/dataOutPartitionValue/nominal-time/clusterName/data
-        Path stagingPath = new Path(stagingDir + "/*/*/*/*");
-        FileSystem fs = getFileSystem(cluster);
-        try {
-            FileStatus[] paths = fs.globStatus(stagingPath);
-            delete(cluster, feed, retention, paths);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-    }
-
     @Override
-    protected Path getLogPath(Entity entity, String stagingPath) {
-        return new Path(stagingPath, "falcon/workflows/feed/"
-                + entity.getName() + "/logs/job-*/*/*");
+    protected String getRelativeLogPath() {
+        return "job-*/*/*";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
index e6ce72f..4eb9162 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
@@ -18,11 +18,9 @@
 package org.apache.falcon.cleanup;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.fs.Path;
 
 import java.util.Collection;
 
@@ -35,14 +33,11 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler {
     public void cleanup() throws FalconException {
         Collection<String> processes = STORE.getEntities(EntityType.PROCESS);
         for (String processName : processes) {
-            Process process;
-            process = STORE.get(EntityType.PROCESS, processName);
-            long retention = getRetention(process, process.getFrequency()
-                    .getTimeUnit());
-            for (org.apache.falcon.entity.v0.process.Cluster cluster : process
-                    .getClusters().getClusters()) {
-                Cluster currentCluster = STORE.get(EntityType.CLUSTER,
-                        cluster.getName());
+            Process process = STORE.get(EntityType.PROCESS, processName);
+            long retention = getRetention(process, process.getFrequency().getTimeUnit());
+
+            for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters() .getClusters()) {
+                Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
                 if (currentCluster.getColo().equals(getCurrentColo())) {
                     LOG.info("Cleaning up logs for process: {} in cluster: {} with retention: {}",
                             processName, cluster.getName(), retention);
@@ -52,15 +47,11 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler {
                             processName, cluster.getName());
                 }
             }
-
         }
     }
 
     @Override
-    protected Path getLogPath(Entity entity, String stagingPath) {
-        Path logPath = new Path(stagingPath, "falcon/workflows/process/"
-                + entity.getName() + "/logs/job-*/*");
-        return logPath;
+    protected String getRelativeLogPath() {
+        return "job-*/*";
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 9b66363..44d8d01 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -266,10 +266,11 @@ public final class FeedHelper {
         String workflowName = EntityUtil.getWorkflowName(
                 tag, Arrays.asList(clusterEntity.getName()), feed).toString();
 
-        return ClusterHelper.getCompleteLocation(clusterEntity, "staging") + "/"
+        // log path is created at scheduling wf and has 777 perms
+        return ClusterHelper.getStorageUrl(clusterEntity)
+                + EntityUtil.getLogPath(clusterEntity, feed) + "/"
                 + workflowName + "/"
                 + storage.getDatabase() + "/"
                 + storage.getTable();
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 6f94dd7..a7acfe1 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -158,6 +158,14 @@
             <param>falconTargetPartition=${falconTargetPartition}</param>
             <param>falconTargetStagingDir=${distcpTargetPaths}</param>
         </hive>
+        <ok to="cleanup-table-staging-dir"/>
+        <error to="failed-post-processing"/>
+    </action>
+    <action name="cleanup-table-staging-dir">
+        <fs>
+            <delete path="${distcpSourcePaths}"/>
+            <delete path="${distcpTargetPaths}"/>
+        </fs>
         <ok to="succeeded-post-processing"/>
         <error to="failed-post-processing"/>
     </action>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
index 64b38ff..5d6879a 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -374,8 +374,9 @@ public class OozieFeedWorkflowBuilderTest {
         Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
         Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
         Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
-        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
-        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+        Assert.assertEquals("cleanup-table-staging-dir", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+        Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+        Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(9)).getName());
     }
 
     @DataProvider(name = "secureOptions")


[2/2] git commit: FALCON-456 Custom DistCp conflict with core DistCp in container classpath fails falcon workflows. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-456 Custom DistCp conflict with core DistCp in container classpath fails falcon workflows. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/91704c6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/91704c6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/91704c6a

Branch: refs/heads/master
Commit: 91704c6aed86bdc8e5f2dc7981d69ade75048973
Parents: 5ac7544
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 30 21:40:18 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 30 21:40:18 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                                  | 3 +++
 .../main/resources/config/workflow/replication-workflow.xml  | 8 ++++++++
 2 files changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/91704c6a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d851109..075744a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-456 Custom DistCp conflict with core DistCp in container classpath
+   fails falcon workflows (Venkatesh Seetharam)
+
    FALCON-446 Hive Replications fail because of permissions issue
    (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/91704c6a/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index a7acfe1..0748acf 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -100,6 +100,14 @@
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
             <configuration>
+                <property> <!-- hadoop 2 parameter -->
+                    <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+                    <value>true</value>
+                </property>
+                <property> <!-- hadoop 1 parameter -->
+                    <name>oozie.launcher.mapreduce.user.classpath.first</name>
+                    <value>true</value>
+                </property>
                 <property>
                     <name>mapred.job.queue.name</name>
                     <value>${queueName}</value>