You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/05/31 06:40:34 UTC
[1/2] git commit: FALCON-446 Hive Replications fail because of
permissions issue. Contributed by Venkatesh Seetharam
Repository: incubator-falcon
Updated Branches:
refs/heads/master 3d3bde558 -> 91704c6ae
FALCON-446 Hive Replications fail because of permissions issue. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5ac7544b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5ac7544b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5ac7544b
Branch: refs/heads/master
Commit: 5ac7544bcb0d9e578d5ddc78de9382e5057220af
Parents: 3d3bde5
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 30 21:38:39 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 30 21:38:39 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../falcon/cleanup/AbstractCleanupHandler.java | 17 ++++--
.../falcon/cleanup/FeedCleanupHandler.java | 60 +++-----------------
.../falcon/cleanup/ProcessCleanupHandler.java | 23 +++-----
.../org/apache/falcon/entity/FeedHelper.java | 5 +-
.../config/workflow/replication-workflow.xml | 8 +++
.../converter/OozieFeedWorkflowBuilderTest.java | 5 +-
7 files changed, 44 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e06fc80..d851109 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-446 Hive Replications fail because of permissions issue
+ (Venkatesh Seetharam)
+
FALCON-444 Logs dir for replication workflow is incorrect and jobs fail
with permission issues (Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index ab85ae0..846d48a 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -20,6 +20,7 @@ package org.apache.falcon.cleanup;
import org.apache.commons.el.ExpressionEvaluatorImpl;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.Frequency;
@@ -69,21 +70,25 @@ public abstract class AbstractCleanupHandler {
"log.cleanup.frequency." + timeunit + ".retention", "days(1)");
}
- protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster, Entity entity)
- throws FalconException {
-
- String stagingPath = ClusterHelper.getLocation(cluster, "staging");
- Path logPath = getLogPath(entity, stagingPath);
+ protected FileStatus[] getAllLogs(org.apache.falcon.entity.v0.cluster.Cluster cluster,
+ Entity entity) throws FalconException {
FileSystem fs = getFileSystem(cluster);
FileStatus[] paths;
try {
+ Path logPath = getLogPath(cluster, entity);
paths = fs.globStatus(logPath);
} catch (IOException e) {
throw new FalconException(e);
}
+
return paths;
}
+ private Path getLogPath(Cluster cluster, Entity entity) {
+ // logsPath = base log path + relative path
+ return new Path(EntityUtil.getLogPath(cluster, entity), getRelativeLogPath());
+ }
+
protected FileSystem getFileSystem(org.apache.falcon.entity.v0.cluster.Cluster cluster)
throws FalconException {
@@ -139,7 +144,7 @@ public abstract class AbstractCleanupHandler {
public abstract void cleanup() throws FalconException;
- protected abstract Path getLogPath(Entity entity, String stagingPath);
+ protected abstract String getRelativeLogPath();
protected String getCurrentColo() {
return StartupProperties.get().getProperty("current.colo", "default");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 452ab02..5d4ecd9 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -18,19 +18,10 @@
package org.apache.falcon.cleanup;
import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import java.io.IOException;
import java.util.Collection;
/**
@@ -42,19 +33,15 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
public void cleanup() throws FalconException {
Collection<String> feeds = STORE.getEntities(EntityType.FEED);
for (String feedName : feeds) {
- Feed feed;
- feed = STORE.get(EntityType.FEED, feedName);
- long retention = getRetention(feed, feed.getFrequency()
- .getTimeUnit());
- for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed
- .getClusters().getClusters()) {
- Cluster currentCluster = STORE.get(EntityType.CLUSTER,
- cluster.getName());
+ Feed feed = STORE.get(EntityType.FEED, feedName);
+ long retention = getRetention(feed, feed.getFrequency().getTimeUnit());
+
+ for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
+ Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
if (currentCluster.getColo().equals(getCurrentColo())) {
- LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}", feedName,
- cluster.getName(), retention);
+ LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}",
+ feedName, cluster.getName(), retention);
delete(currentCluster, feed, retention);
- deleteStagedData(currentCluster, feed, retention);
} else {
LOG.info("Ignoring cleanup for feed: {} in cluster: {} as this does not belong to current colo",
feedName, cluster.getName());
@@ -64,37 +51,8 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
}
}
- /**
- * Delete the staging area used for replicating tables.
- *
- * @param cluster cluster hosting the staged data
- * @param feed feed entity
- * @param retention retention limit
- * @throws FalconException
- */
- private void deleteStagedData(Cluster cluster, Feed feed, long retention)
- throws FalconException {
- Storage storage = FeedHelper.createStorage(cluster, feed);
- if (storage.getType() == Storage.TYPE.FILESYSTEM) { // FS does NOT use staging dirs
- return;
- }
-
- final CatalogStorage tableStorage = (CatalogStorage) storage;
- String stagingDir = FeedHelper.getStagingDir(cluster, feed, tableStorage, Tag.REPLICATION);
- //stagingDir/dataOutPartitionValue/nominal-time/clusterName/data
- Path stagingPath = new Path(stagingDir + "/*/*/*/*");
- FileSystem fs = getFileSystem(cluster);
- try {
- FileStatus[] paths = fs.globStatus(stagingPath);
- delete(cluster, feed, retention, paths);
- } catch (IOException e) {
- throw new FalconException(e);
- }
- }
-
@Override
- protected Path getLogPath(Entity entity, String stagingPath) {
- return new Path(stagingPath, "falcon/workflows/feed/"
- + entity.getName() + "/logs/job-*/*/*");
+ protected String getRelativeLogPath() {
+ return "job-*/*/*";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
index e6ce72f..4eb9162 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
@@ -18,11 +18,9 @@
package org.apache.falcon.cleanup;
import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.fs.Path;
import java.util.Collection;
@@ -35,14 +33,11 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler {
public void cleanup() throws FalconException {
Collection<String> processes = STORE.getEntities(EntityType.PROCESS);
for (String processName : processes) {
- Process process;
- process = STORE.get(EntityType.PROCESS, processName);
- long retention = getRetention(process, process.getFrequency()
- .getTimeUnit());
- for (org.apache.falcon.entity.v0.process.Cluster cluster : process
- .getClusters().getClusters()) {
- Cluster currentCluster = STORE.get(EntityType.CLUSTER,
- cluster.getName());
+ Process process = STORE.get(EntityType.PROCESS, processName);
+ long retention = getRetention(process, process.getFrequency().getTimeUnit());
+
+ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters() .getClusters()) {
+ Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
if (currentCluster.getColo().equals(getCurrentColo())) {
LOG.info("Cleaning up logs for process: {} in cluster: {} with retention: {}",
processName, cluster.getName(), retention);
@@ -52,15 +47,11 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler {
processName, cluster.getName());
}
}
-
}
}
@Override
- protected Path getLogPath(Entity entity, String stagingPath) {
- Path logPath = new Path(stagingPath, "falcon/workflows/process/"
- + entity.getName() + "/logs/job-*/*");
- return logPath;
+ protected String getRelativeLogPath() {
+ return "job-*/*";
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index 9b66363..44d8d01 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -266,10 +266,11 @@ public final class FeedHelper {
String workflowName = EntityUtil.getWorkflowName(
tag, Arrays.asList(clusterEntity.getName()), feed).toString();
- return ClusterHelper.getCompleteLocation(clusterEntity, "staging") + "/"
+ // log path is created at scheduling wf and has 777 perms
+ return ClusterHelper.getStorageUrl(clusterEntity)
+ + EntityUtil.getLogPath(clusterEntity, feed) + "/"
+ workflowName + "/"
+ storage.getDatabase() + "/"
+ storage.getTable();
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index 6f94dd7..a7acfe1 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -158,6 +158,14 @@
<param>falconTargetPartition=${falconTargetPartition}</param>
<param>falconTargetStagingDir=${distcpTargetPaths}</param>
</hive>
+ <ok to="cleanup-table-staging-dir"/>
+ <error to="failed-post-processing"/>
+ </action>
+ <action name="cleanup-table-staging-dir">
+ <fs>
+ <delete path="${distcpSourcePaths}"/>
+ <delete path="${distcpTargetPaths}"/>
+ </fs>
<ok to="succeeded-post-processing"/>
<error to="failed-post-processing"/>
</action>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5ac7544b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
index 64b38ff..5d6879a 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedWorkflowBuilderTest.java
@@ -374,8 +374,9 @@ public class OozieFeedWorkflowBuilderTest {
Assert.assertEquals("replication", ((ACTION) decisionOrForkOrJoin.get(4)).getName());
Assert.assertEquals("post-replication-decision", ((DECISION) decisionOrForkOrJoin.get(5)).getName());
Assert.assertEquals("table-import", ((ACTION) decisionOrForkOrJoin.get(6)).getName());
- Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
- Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+ Assert.assertEquals("cleanup-table-staging-dir", ((ACTION) decisionOrForkOrJoin.get(7)).getName());
+ Assert.assertEquals("succeeded-post-processing", ((ACTION) decisionOrForkOrJoin.get(8)).getName());
+ Assert.assertEquals("failed-post-processing", ((ACTION) decisionOrForkOrJoin.get(9)).getName());
}
@DataProvider(name = "secureOptions")
[2/2] git commit: FALCON-456 Custom DistCp conflict with core DistCp
in container classpath fails falcon workflows. Contributed by Venkatesh
Seetharam
Posted by ve...@apache.org.
FALCON-456 Custom DistCp conflict with core DistCp in container classpath fails falcon workflows. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/91704c6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/91704c6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/91704c6a
Branch: refs/heads/master
Commit: 91704c6aed86bdc8e5f2dc7981d69ade75048973
Parents: 5ac7544
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri May 30 21:40:18 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri May 30 21:40:18 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../main/resources/config/workflow/replication-workflow.xml | 8 ++++++++
2 files changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/91704c6a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d851109..075744a 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-456 Custom DistCp conflict with core DistCp in container classpath
+ fails falcon workflows (Venkatesh Seetharam)
+
FALCON-446 Hive Replications fail because of permissions issue
(Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/91704c6a/feed/src/main/resources/config/workflow/replication-workflow.xml
----------------------------------------------------------------------
diff --git a/feed/src/main/resources/config/workflow/replication-workflow.xml b/feed/src/main/resources/config/workflow/replication-workflow.xml
index a7acfe1..0748acf 100644
--- a/feed/src/main/resources/config/workflow/replication-workflow.xml
+++ b/feed/src/main/resources/config/workflow/replication-workflow.xml
@@ -100,6 +100,14 @@
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property> <!-- hadoop 1 parameter -->
+ <name>oozie.launcher.mapreduce.user.classpath.first</name>
+ <value>true</value>
+ </property>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>