You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/07 02:15:46 UTC
[4/5] git commit: FALCON-773 Log clean up handlers only work in
distributed mode. Contributed by Balu Vellanki
FALCON-773 Log clean up handlers only work in distributed mode. Contributed by Balu Vellanki
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/de8c94a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/de8c94a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/de8c94a1
Branch: refs/heads/master
Commit: de8c94a1ef076302b42170ead7d37f3b7999d2ac
Parents: 7ab8eb3
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Mon Oct 6 16:10:27 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Mon Oct 6 16:10:27 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../falcon/cleanup/AbstractCleanupHandler.java | 30 +++++---
.../falcon/cleanup/FeedCleanupHandler.java | 12 +---
.../falcon/cleanup/ProcessCleanupHandler.java | 13 +---
.../falcon/service/LogCleanupService.java | 2 -
.../workflow/WorkflowExecutionContext.java | 7 +-
common/src/main/resources/runtime.properties | 6 +-
common/src/main/resources/startup.properties | 2 +-
.../falcon/cleanup/LogCleanupServiceTest.java | 73 +++-----------------
.../workflow/WorkflowExecutionContextTest.java | 2 +-
src/conf/runtime.properties | 4 +-
11 files changed, 49 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e22bc1e..c0e5e24 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -109,6 +109,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-773 Log clean up handlers only work in distributed mode
+ (Balu Vellanki via Venkatesh Seetharam)
+
FALCON-760 Messaging is broken for FALCON.ENTITY.TOPIC in case of Eviction
(Sowmya Ramesh via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index 846d48a..c315c25 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -23,11 +23,13 @@ import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.fs.FileStatus;
@@ -95,17 +97,24 @@ public abstract class AbstractCleanupHandler {
return HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
}
- protected void delete(Cluster cluster, Entity entity, long retention)
- throws FalconException {
-
- FileStatus[] logs = getAllLogs(cluster, entity);
- delete(cluster, entity, retention, logs);
+ protected void delete(String clusterName, Entity entity, long retention) throws FalconException {
+ Cluster currentCluster = STORE.get(EntityType.CLUSTER, clusterName);
+ if (isClusterInCurrentColo(currentCluster.getColo())) {
+ LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}",
+ entity.getEntityType(), entity.getName(), clusterName, retention);
+ FileStatus[] logs = getAllLogs(currentCluster, entity);
+ deleteInternal(currentCluster, entity, retention, logs);
+ } else {
+ LOG.info("Ignoring cleanup for {}: {} in cluster: {} as this does not belong to current colo",
+ entity.getEntityType(), entity.getName(), clusterName);
+ }
}
- protected void delete(Cluster cluster, Entity entity, long retention, FileStatus[] logs)
- throws FalconException {
+ protected void deleteInternal(Cluster cluster, Entity entity,
+ long retention, FileStatus[] logs) throws FalconException {
if (logs == null || logs.length == 0) {
- LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(), entity.getName());
+ LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(),
+ entity.getName());
return;
}
@@ -146,7 +155,8 @@ public abstract class AbstractCleanupHandler {
protected abstract String getRelativeLogPath();
- protected String getCurrentColo() {
- return StartupProperties.get().getProperty("current.colo", "default");
+ protected boolean isClusterInCurrentColo(String colo) {
+ final String currentColo = StartupProperties.get().getProperty("current.colo", "default");
+ return DeploymentUtil.isEmbeddedMode() || currentColo.equals(colo);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 5d4ecd9..16db7d8 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -19,7 +19,6 @@ package org.apache.falcon.cleanup;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import java.util.Collection;
@@ -37,17 +36,8 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
long retention = getRetention(feed, feed.getFrequency().getTimeUnit());
for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
- Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
- if (currentCluster.getColo().equals(getCurrentColo())) {
- LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}",
- feedName, cluster.getName(), retention);
- delete(currentCluster, feed, retention);
- } else {
- LOG.info("Ignoring cleanup for feed: {} in cluster: {} as this does not belong to current colo",
- feedName, cluster.getName());
- }
+ delete(cluster.getName(), feed, retention);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
index 4eb9162..00281f9 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
@@ -19,7 +19,6 @@ package org.apache.falcon.cleanup;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.process.Process;
import java.util.Collection;
@@ -36,16 +35,8 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler {
Process process = STORE.get(EntityType.PROCESS, processName);
long retention = getRetention(process, process.getFrequency().getTimeUnit());
- for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters() .getClusters()) {
- Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
- if (currentCluster.getColo().equals(getCurrentColo())) {
- LOG.info("Cleaning up logs for process: {} in cluster: {} with retention: {}",
- processName, cluster.getName(), retention);
- delete(currentCluster, process, retention);
- } else {
- LOG.info("Ignoring cleanup for process: {} in cluster: {} as this does not belong to current colo",
- processName, cluster.getName());
- }
+ for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+ delete(cluster.getName(), process, retention);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
index a32eaa7..9962102 100644
--- a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
+++ b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
@@ -54,7 +54,6 @@ public class LogCleanupService implements FalconService {
Timer timer = new Timer();
timer.schedule(new CleanupThread(), 0, getDelay());
LOG.info("Falcon log cleanup service initialized");
-
}
private static class CleanupThread extends TimerTask {
@@ -91,5 +90,4 @@ public class LogCleanupService implements FalconService {
throw new FalconException("Exception in EL evaluation", e);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 4c94c27..8df6855 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -125,7 +126,9 @@ public class WorkflowExecutionContext {
}
public String getContextFile() {
- return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
+ return EntityType.PROCESS.name().equals(getEntityType())
+ ? getValue(WorkflowExecutionArgs.CONTEXT_FILE)
+ : "/context/" + getValue(WorkflowExecutionArgs.CONTEXT_FILE); // needed by feed clean up
}
public String getLogDir() {
@@ -188,7 +191,7 @@ public class WorkflowExecutionContext {
}
public String getEntityType() {
- return getValue(WorkflowExecutionArgs.ENTITY_TYPE);
+ return getValue(WorkflowExecutionArgs.ENTITY_TYPE).toUpperCase();
}
public EntityOperations getOperation() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 0a4de94..185602c 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -18,8 +18,8 @@
*.domain=debug
-*.log.cleanup.frequency.minutes.retention =hours(6)
-*.log.cleanup.frequency.hours.retention =minutes(1)
+*.log.cleanup.frequency.minutes.retention=minutes(15)
+*.log.cleanup.frequency.hours.retention=hours(6)
*.log.cleanup.frequency.days.retention =days(7)
*.log.cleanup.frequency.months.retention =months(3)
@@ -27,4 +27,4 @@
*.falcon.parentworkflow.retry.interval.secs=1
*.falcon.replication.workflow.maxmaps=5
-*.falcon.replication.workflow.mapbandwidthKB=102400
\ No newline at end of file
+*.falcon.replication.workflow.mapbandwidthKB=102400
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index e233b2a..99aa3c0 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -72,7 +72,7 @@ debug.libext.feed.retention.paths=${falcon.libext}
debug.libext.feed.replication.paths=${falcon.libext}
debug.libext.process.paths=${falcon.libext}
-*.falcon.cleanup.service.frequency=days(1)
+*.falcon.cleanup.service.frequency=minutes(5)
######### Properties for configuring JMS provider - activemq #########
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index 432d06b..6ad742e 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -18,18 +18,13 @@
package org.apache.falcon.cleanup;
import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -48,10 +43,6 @@ public class LogCleanupServiceTest extends AbstractTestBase {
private FileSystem fs;
private FileSystem tfs;
private EmbeddedCluster targetDfsCluster;
- private Path sourceStagingPath1;
- private Path sourceStagingPath2;
- private Path targetStagingPath1;
- private Path targetStagingPath2;
private final Path instanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/process/"
+ "sample" + "/logs/job-2010-01-01-01-00/000");
@@ -123,8 +114,6 @@ public class LogCleanupServiceTest extends AbstractTestBase {
// table feed staging dir setup
initializeStagingDirs();
- createStageData(sourceStagingPath1, targetStagingPath1, 0);
- createStageData(sourceStagingPath2, targetStagingPath2, 10000);
Thread.sleep(1000);
}
@@ -132,47 +121,16 @@ public class LogCleanupServiceTest extends AbstractTestBase {
final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
Feed tableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
getStore().publish(EntityType.FEED, tableFeed);
-
- final Cluster srcCluster = dfsCluster.getCluster();
- final CatalogStorage sourceStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
- String sourceStagingDir = FeedHelper.getStagingDir(srcCluster, tableFeed, sourceStorage, Tag.REPLICATION);
-
- sourceStagingPath1 = new Path(sourceStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
- sourceStagingPath2 = new Path(sourceStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
-
- final Cluster targetCluster = targetDfsCluster.getCluster();
- final CatalogStorage targetStorage = (CatalogStorage) FeedHelper.createStorage(targetCluster, tableFeed);
- String targetStagingDir = FeedHelper.getStagingDir(targetCluster, tableFeed, targetStorage, Tag.REPLICATION);
-
- targetStagingPath1 = new Path(targetStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
- targetStagingPath2 = new Path(targetStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
- }
-
- private void createStageData(Path sourcePath, Path targetPath, int offset) throws Exception {
- fs.mkdirs(sourcePath);
- Path metaSource = new Path(sourcePath, "_metadata.xml");
- Path dataSource = new Path(sourcePath, "data.txt");
- fs.createNewFile(metaSource);
- fs.createNewFile(dataSource);
- FileStatus status = fs.getFileStatus(metaSource);
- fs.setTimes(metaSource, status.getModificationTime() + offset, status.getAccessTime());
- status = fs.getFileStatus(dataSource);
- fs.setTimes(dataSource, status.getModificationTime() + offset, status.getAccessTime());
-
- tfs.mkdirs(targetPath);
- Path metaTarget = new Path(targetPath, "_metadata.xml");
- Path dataTarget = new Path(targetPath, "data.txt");
- tfs.createNewFile(metaTarget);
- tfs.createNewFile(dataTarget);
- status = tfs.getFileStatus(metaTarget);
- tfs.setTimes(metaTarget, status.getModificationTime() + offset, status.getAccessTime());
- status = tfs.getFileStatus(dataTarget);
- tfs.setTimes(dataTarget, status.getModificationTime() + offset, status.getAccessTime());
}
@Test
public void testProcessLogs() throws IOException, FalconException, InterruptedException {
+ Assert.assertTrue(fs.exists(instanceLogPath));
+ Assert.assertTrue(fs.exists(instanceLogPath1));
+ Assert.assertTrue(fs.exists(instanceLogPath2));
+ Assert.assertTrue(fs.exists(instanceLogPath3));
+
AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
processCleanupHandler.cleanup();
@@ -182,9 +140,14 @@ public class LogCleanupServiceTest extends AbstractTestBase {
Assert.assertTrue(fs.exists(instanceLogPath3));
}
- @Test (enabled = false)
+ @Test
public void testFeedLogs() throws IOException, FalconException, InterruptedException {
+ Assert.assertTrue(fs.exists(feedInstanceLogPath));
+ Assert.assertTrue(tfs.exists(feedInstanceLogPath));
+ Assert.assertTrue(fs.exists(feedInstanceLogPath1));
+ Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
+
AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
feedCleanupHandler.cleanup();
@@ -192,19 +155,5 @@ public class LogCleanupServiceTest extends AbstractTestBase {
Assert.assertFalse(tfs.exists(feedInstanceLogPath));
Assert.assertTrue(fs.exists(feedInstanceLogPath1));
Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
-
- // source table replication staging dirs
- Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "_metadata.xml")));
- Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "data.txt")));
-
- Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "_metadata.xml")));
- Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "data.txt")));
-
- // target table replication staging dirs
- Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "_metadata.xml")));
- Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "data.txt")));
-
- Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "_metadata.xml")));
- Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "data.txt")));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index a45633b..117f2b3 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -141,7 +141,7 @@ public class WorkflowExecutionContextTest {
@Test
public void testGetEntityType() throws Exception {
- Assert.assertEquals(context.getEntityType(), "process");
+ Assert.assertEquals(context.getEntityType(), "PROCESS");
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/src/conf/runtime.properties
----------------------------------------------------------------------
diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties
index 89e871d..f9e26de 100644
--- a/src/conf/runtime.properties
+++ b/src/conf/runtime.properties
@@ -22,8 +22,8 @@
*.domain=${falcon.app.type}
-*.log.cleanup.frequency.minutes.retention=hours(6)
-*.log.cleanup.frequency.hours.retention=minutes(1)
+*.log.cleanup.frequency.minutes.retention=minutes(15)
+*.log.cleanup.frequency.hours.retention=hours(6)
*.log.cleanup.frequency.days.retention=days(7)
*.log.cleanup.frequency.months.retention=months(3)