You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/07 02:15:46 UTC

[4/5] git commit: FALCON-773 Log clean up handlers only work in distributed mode. Contributed by Balu Vellanki

FALCON-773 Log clean up handlers only work in distributed mode. Contributed by Balu Vellanki


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/de8c94a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/de8c94a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/de8c94a1

Branch: refs/heads/master
Commit: de8c94a1ef076302b42170ead7d37f3b7999d2ac
Parents: 7ab8eb3
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Mon Oct 6 16:10:27 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Mon Oct 6 16:10:27 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../falcon/cleanup/AbstractCleanupHandler.java  | 30 +++++---
 .../falcon/cleanup/FeedCleanupHandler.java      | 12 +---
 .../falcon/cleanup/ProcessCleanupHandler.java   | 13 +---
 .../falcon/service/LogCleanupService.java       |  2 -
 .../workflow/WorkflowExecutionContext.java      |  7 +-
 common/src/main/resources/runtime.properties    |  6 +-
 common/src/main/resources/startup.properties    |  2 +-
 .../falcon/cleanup/LogCleanupServiceTest.java   | 73 +++-----------------
 .../workflow/WorkflowExecutionContextTest.java  |  2 +-
 src/conf/runtime.properties                     |  4 +-
 11 files changed, 49 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e22bc1e..c0e5e24 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -109,6 +109,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-773 Log clean up handlers only work in distributed mode
+   (Balu Vellanki via Venkatesh Seetharam)
+
    FALCON-760 Messaging is broken for FALCON.ENTITY.TOPIC in case of Eviction
    (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
index 846d48a..c315c25 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/AbstractCleanupHandler.java
@@ -23,11 +23,13 @@ import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
 import org.apache.falcon.entity.v0.Frequency.TimeUnit;
 import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.RuntimeProperties;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.fs.FileStatus;
@@ -95,17 +97,24 @@ public abstract class AbstractCleanupHandler {
         return HadoopClientFactory.get().createFileSystem(ClusterHelper.getConfiguration(cluster));
     }
 
-    protected void delete(Cluster cluster, Entity entity, long retention)
-        throws FalconException {
-
-        FileStatus[] logs = getAllLogs(cluster, entity);
-        delete(cluster, entity, retention, logs);
+    protected void delete(String clusterName, Entity entity, long retention) throws FalconException {
+        Cluster currentCluster = STORE.get(EntityType.CLUSTER, clusterName);
+        if (isClusterInCurrentColo(currentCluster.getColo())) {
+            LOG.info("Cleaning up logs for {}: {} in cluster: {} with retention: {}",
+                    entity.getEntityType(), entity.getName(), clusterName, retention);
+            FileStatus[] logs = getAllLogs(currentCluster, entity);
+            deleteInternal(currentCluster, entity, retention, logs);
+        } else {
+            LOG.info("Ignoring cleanup for {}: {} in cluster: {} as this does not belong to current colo",
+                    entity.getEntityType(), entity.getName(), clusterName);
+        }
     }
 
-    protected void delete(Cluster cluster, Entity entity, long retention, FileStatus[] logs)
-        throws FalconException {
+    protected void deleteInternal(Cluster cluster, Entity entity,
+                                  long retention, FileStatus[] logs) throws FalconException {
         if (logs == null || logs.length == 0) {
-            LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(), entity.getName());
+            LOG.info("Nothing to delete for cluster: {}, entity: {}", cluster.getName(),
+                    entity.getName());
             return;
         }
 
@@ -146,7 +155,8 @@ public abstract class AbstractCleanupHandler {
 
     protected abstract String getRelativeLogPath();
 
-    protected String getCurrentColo() {
-        return StartupProperties.get().getProperty("current.colo", "default");
+    protected boolean isClusterInCurrentColo(String colo) {
+        final String currentColo = StartupProperties.get().getProperty("current.colo", "default");
+        return DeploymentUtil.isEmbeddedMode() || currentColo.equals(colo);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
index 5d4ecd9..16db7d8 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/FeedCleanupHandler.java
@@ -19,7 +19,6 @@ package org.apache.falcon.cleanup;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 
 import java.util.Collection;
@@ -37,17 +36,8 @@ public class FeedCleanupHandler extends AbstractCleanupHandler {
             long retention = getRetention(feed, feed.getFrequency().getTimeUnit());
 
             for (org.apache.falcon.entity.v0.feed.Cluster cluster : feed.getClusters().getClusters()) {
-                Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
-                if (currentCluster.getColo().equals(getCurrentColo())) {
-                    LOG.info("Cleaning up logs & staged data for feed: {} in cluster: {} with retention: {}",
-                            feedName, cluster.getName(), retention);
-                    delete(currentCluster, feed, retention);
-                } else {
-                    LOG.info("Ignoring cleanup for feed: {} in cluster: {} as this does not belong to current colo",
-                            feedName, cluster.getName());
-                }
+                delete(cluster.getName(), feed, retention);
             }
-
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
index 4eb9162..00281f9 100644
--- a/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
+++ b/common/src/main/java/org/apache/falcon/cleanup/ProcessCleanupHandler.java
@@ -19,7 +19,6 @@ package org.apache.falcon.cleanup;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.process.Process;
 
 import java.util.Collection;
@@ -36,16 +35,8 @@ public class ProcessCleanupHandler extends AbstractCleanupHandler {
             Process process = STORE.get(EntityType.PROCESS, processName);
             long retention = getRetention(process, process.getFrequency().getTimeUnit());
 
-            for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters() .getClusters()) {
-                Cluster currentCluster = STORE.get(EntityType.CLUSTER, cluster.getName());
-                if (currentCluster.getColo().equals(getCurrentColo())) {
-                    LOG.info("Cleaning up logs for process: {} in cluster: {} with retention: {}",
-                            processName, cluster.getName(), retention);
-                    delete(currentCluster, process, retention);
-                } else {
-                    LOG.info("Ignoring cleanup for process: {} in cluster: {} as this does not belong to current colo",
-                            processName, cluster.getName());
-                }
+            for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
+                delete(cluster.getName(), process, retention);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
index a32eaa7..9962102 100644
--- a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
+++ b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
@@ -54,7 +54,6 @@ public class LogCleanupService implements FalconService {
         Timer timer = new Timer();
         timer.schedule(new CleanupThread(), 0, getDelay());
         LOG.info("Falcon log cleanup service initialized");
-
     }
 
     private static class CleanupThread extends TimerTask {
@@ -91,5 +90,4 @@ public class LogCleanupService implements FalconService {
             throw new FalconException("Exception in EL evaluation", e);
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 4c94c27..8df6855 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -25,6 +25,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -125,7 +126,9 @@ public class WorkflowExecutionContext {
     }
 
     public String getContextFile() {
-        return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
+        return EntityType.PROCESS.name().equals(getEntityType())
+            ? getValue(WorkflowExecutionArgs.CONTEXT_FILE)
+            : "/context/" + getValue(WorkflowExecutionArgs.CONTEXT_FILE); // needed by feed clean up
     }
 
     public String getLogDir() {
@@ -188,7 +191,7 @@ public class WorkflowExecutionContext {
     }
 
     public String getEntityType() {
-        return getValue(WorkflowExecutionArgs.ENTITY_TYPE);
+        return getValue(WorkflowExecutionArgs.ENTITY_TYPE).toUpperCase();
     }
 
     public EntityOperations getOperation() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
index 0a4de94..185602c 100644
--- a/common/src/main/resources/runtime.properties
+++ b/common/src/main/resources/runtime.properties
@@ -18,8 +18,8 @@
 
 *.domain=debug
 
-*.log.cleanup.frequency.minutes.retention =hours(6)
-*.log.cleanup.frequency.hours.retention =minutes(1)
+*.log.cleanup.frequency.minutes.retention=minutes(15)
+*.log.cleanup.frequency.hours.retention=hours(6)
 *.log.cleanup.frequency.days.retention =days(7)
 *.log.cleanup.frequency.months.retention =months(3)
 
@@ -27,4 +27,4 @@
 *.falcon.parentworkflow.retry.interval.secs=1
 
 *.falcon.replication.workflow.maxmaps=5
-*.falcon.replication.workflow.mapbandwidthKB=102400
\ No newline at end of file
+*.falcon.replication.workflow.mapbandwidthKB=102400

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index e233b2a..99aa3c0 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -72,7 +72,7 @@ debug.libext.feed.retention.paths=${falcon.libext}
 debug.libext.feed.replication.paths=${falcon.libext}
 debug.libext.process.paths=${falcon.libext}
 
-*.falcon.cleanup.service.frequency=days(1)
+*.falcon.cleanup.service.frequency=minutes(5)
 
 
 ######### Properties for configuring JMS provider - activemq #########

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index 432d06b..6ad742e 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -18,18 +18,13 @@
 package org.apache.falcon.cleanup;
 
 import org.apache.falcon.FalconException;
-import org.apache.falcon.Tag;
 import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.store.ConfigurationStore;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -48,10 +43,6 @@ public class LogCleanupServiceTest extends AbstractTestBase {
     private FileSystem fs;
     private FileSystem tfs;
     private EmbeddedCluster targetDfsCluster;
-    private Path sourceStagingPath1;
-    private Path sourceStagingPath2;
-    private Path targetStagingPath1;
-    private Path targetStagingPath2;
 
     private final Path instanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/process/"
         + "sample" + "/logs/job-2010-01-01-01-00/000");
@@ -123,8 +114,6 @@ public class LogCleanupServiceTest extends AbstractTestBase {
 
         // table feed staging dir setup
         initializeStagingDirs();
-        createStageData(sourceStagingPath1, targetStagingPath1, 0);
-        createStageData(sourceStagingPath2, targetStagingPath2, 10000);
         Thread.sleep(1000);
     }
 
@@ -132,47 +121,16 @@ public class LogCleanupServiceTest extends AbstractTestBase {
         final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
         Feed tableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
         getStore().publish(EntityType.FEED, tableFeed);
-
-        final Cluster srcCluster = dfsCluster.getCluster();
-        final CatalogStorage sourceStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
-        String sourceStagingDir = FeedHelper.getStagingDir(srcCluster, tableFeed, sourceStorage, Tag.REPLICATION);
-
-        sourceStagingPath1 = new Path(sourceStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
-        sourceStagingPath2 = new Path(sourceStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
-
-        final Cluster targetCluster = targetDfsCluster.getCluster();
-        final CatalogStorage targetStorage = (CatalogStorage) FeedHelper.createStorage(targetCluster, tableFeed);
-        String targetStagingDir = FeedHelper.getStagingDir(targetCluster, tableFeed, targetStorage, Tag.REPLICATION);
-
-        targetStagingPath1 = new Path(targetStagingDir + "/ds=2012092400/" + System.currentTimeMillis());
-        targetStagingPath2 = new Path(targetStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
-    }
-
-    private void createStageData(Path sourcePath, Path targetPath, int offset) throws Exception {
-        fs.mkdirs(sourcePath);
-        Path metaSource = new Path(sourcePath, "_metadata.xml");
-        Path dataSource = new Path(sourcePath, "data.txt");
-        fs.createNewFile(metaSource);
-        fs.createNewFile(dataSource);
-        FileStatus status = fs.getFileStatus(metaSource);
-        fs.setTimes(metaSource, status.getModificationTime() + offset, status.getAccessTime());
-        status = fs.getFileStatus(dataSource);
-        fs.setTimes(dataSource, status.getModificationTime() + offset, status.getAccessTime());
-
-        tfs.mkdirs(targetPath);
-        Path metaTarget = new Path(targetPath, "_metadata.xml");
-        Path dataTarget = new Path(targetPath, "data.txt");
-        tfs.createNewFile(metaTarget);
-        tfs.createNewFile(dataTarget);
-        status = tfs.getFileStatus(metaTarget);
-        tfs.setTimes(metaTarget, status.getModificationTime() + offset, status.getAccessTime());
-        status = tfs.getFileStatus(dataTarget);
-        tfs.setTimes(dataTarget, status.getModificationTime() + offset, status.getAccessTime());
     }
 
     @Test
     public void testProcessLogs() throws IOException, FalconException, InterruptedException {
 
+        Assert.assertTrue(fs.exists(instanceLogPath));
+        Assert.assertTrue(fs.exists(instanceLogPath1));
+        Assert.assertTrue(fs.exists(instanceLogPath2));
+        Assert.assertTrue(fs.exists(instanceLogPath3));
+
         AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
         processCleanupHandler.cleanup();
 
@@ -182,9 +140,14 @@ public class LogCleanupServiceTest extends AbstractTestBase {
         Assert.assertTrue(fs.exists(instanceLogPath3));
     }
 
-    @Test (enabled = false)
+    @Test
     public void testFeedLogs() throws IOException, FalconException, InterruptedException {
 
+        Assert.assertTrue(fs.exists(feedInstanceLogPath));
+        Assert.assertTrue(tfs.exists(feedInstanceLogPath));
+        Assert.assertTrue(fs.exists(feedInstanceLogPath1));
+        Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
+
         AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
         feedCleanupHandler.cleanup();
 
@@ -192,19 +155,5 @@ public class LogCleanupServiceTest extends AbstractTestBase {
         Assert.assertFalse(tfs.exists(feedInstanceLogPath));
         Assert.assertTrue(fs.exists(feedInstanceLogPath1));
         Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
-
-        // source table replication staging dirs
-        Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "_metadata.xml")));
-        Assert.assertFalse(fs.exists(new Path(sourceStagingPath1, "data.txt")));
-
-        Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "_metadata.xml")));
-        Assert.assertTrue(fs.exists(new Path(sourceStagingPath2, "data.txt")));
-
-        // target table replication staging dirs
-        Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "_metadata.xml")));
-        Assert.assertFalse(tfs.exists(new Path(targetStagingPath1, "data.txt")));
-
-        Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "_metadata.xml")));
-        Assert.assertTrue(tfs.exists(new Path(targetStagingPath2, "data.txt")));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index a45633b..117f2b3 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -141,7 +141,7 @@ public class WorkflowExecutionContextTest {
 
     @Test
     public void testGetEntityType() throws Exception {
-        Assert.assertEquals(context.getEntityType(), "process");
+        Assert.assertEquals(context.getEntityType(), "PROCESS");
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/de8c94a1/src/conf/runtime.properties
----------------------------------------------------------------------
diff --git a/src/conf/runtime.properties b/src/conf/runtime.properties
index 89e871d..f9e26de 100644
--- a/src/conf/runtime.properties
+++ b/src/conf/runtime.properties
@@ -22,8 +22,8 @@
 
 *.domain=${falcon.app.type}
 
-*.log.cleanup.frequency.minutes.retention=hours(6)
-*.log.cleanup.frequency.hours.retention=minutes(1)
+*.log.cleanup.frequency.minutes.retention=minutes(15)
+*.log.cleanup.frequency.hours.retention=hours(6)
 *.log.cleanup.frequency.days.retention=days(7)
 *.log.cleanup.frequency.months.retention=months(3)