You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:21 UTC

[16/41] git commit: FALCON-594 Process lineage information for Retention policies. Contributed by Sowmya Ramesh

FALCON-594 Process lineage information for Retention policies. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d2e5f8c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d2e5f8c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d2e5f8c9

Branch: refs/heads/FALCON-585
Commit: d2e5f8c9ee8c97a0a6011261f1cbabf2dbf4f309
Parents: d74dc32
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Sep 2 12:31:25 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Sep 2 12:31:25 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../InstanceRelationshipGraphBuilder.java       |  41 +++++++
 .../falcon/metadata/MetadataMappingService.java |   4 +-
 .../falcon/metadata/RelationshipLabel.java      |   5 +-
 .../apache/falcon/retention/EvictionHelper.java |  88 +++++++++++++++
 .../metadata/MetadataMappingServiceTest.java    | 112 ++++++++++++++++---
 .../apache/falcon/retention/FeedEvictor.java    |  30 +----
 .../falcon/cluster/util/EmbeddedCluster.java    |  24 ++--
 8 files changed, 256 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 075fe7e..7145ff2 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,9 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-594 Process lineage information for Retention policies
+   (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-325 Process lineage information for Replication policies
    (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 452872e..e7670da 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -20,8 +20,10 @@ package org.apache.falcon.metadata;
 
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
+import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.common.FeedDataPath;
@@ -32,6 +34,8 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.retention.EvictionHelper;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
@@ -202,6 +206,43 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                 RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE, context.getTimeStampAsISO8601());
     }
 
+    public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
+        String outputFeedNamesArg = context.getOutputFeedNames();
+        if ("NONE".equals(outputFeedNamesArg)) {
+            LOG.info("There are no output feeds for this process, return");
+            return;
+        }
+
+        String logFile = context.getLogFile();
+        if (StringUtils.isEmpty(logFile)){
+            throw new IllegalArgumentException("csv log file path empty");
+        }
+
+        String clusterName = context.getClusterName();
+        String[] paths = EvictionHelper.getInstancePaths(ClusterHelper.getFileSystem(clusterName), new Path(logFile));
+        if (paths == null || paths.length <= 0) {
+            throw new IllegalArgumentException("No instance paths in log file");
+        }
+
+        // For retention there will be only one output feed name
+        String feedName = outputFeedNamesArg;
+        for (String feedInstanceDataPath : paths) {
+            LOG.info("Computing feed instance for : name=" + feedName + ", path= "
+                    + feedInstanceDataPath + ", in cluster: " + clusterName);
+            RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+            String feedInstanceName = getFeedInstanceName(feedName, clusterName, feedInstanceDataPath);
+            Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+
+            LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
+            if (feedInstanceVertex == null) {
+                throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+            }
+
+            addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
+                    RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE, context.getTimeStampAsISO8601());
+        }
+    }
+
     private void addFeedInstance(Vertex processInstance, RelationshipLabel edgeLabel,
                                  WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index ab82ce1..f607e0a 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -293,8 +293,8 @@ public class MetadataMappingService
         instanceGraphBuilder.addReplicatedInstance(context);
     }
 
-    private void onFeedInstanceEvicted(WorkflowExecutionContext context) {
+    private void onFeedInstanceEvicted(WorkflowExecutionContext context) throws FalconException {
         LOG.info("Adding evicted feed instance: {}", context.getNominalTimeAsISO8601());
-        // todo - tbd
+        instanceGraphBuilder.addEvictedInstance(context);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
index acd764f..5b312da 100644
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
+++ b/common/src/main/java/org/apache/falcon/metadata/RelationshipLabel.java
@@ -39,7 +39,10 @@ public enum RelationshipLabel {
     PIPELINES("pipeline"),
 
     // replication labels
-    FEED_CLUSTER_REPLICATED_EDGE("replicated-to");
+    FEED_CLUSTER_REPLICATED_EDGE("replicated-to"),
+
+    // eviction labels
+    FEED_CLUSTER_EVICTED_EDGE("evicted-from");
 
     private final String name;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
new file mode 100644
index 0000000..5d6481c
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.retention;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Helper methods to facilitate eviction.
+ */
+
+public final class EvictionHelper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
+
+    private static final String INSTANCEPATH_FORMAT = "instancePaths=";
+    public static final String INSTANCEPATH_SEPARATOR = ",";
+
+
+    private EvictionHelper() {}
+
+    public static void logInstancePaths(final FileSystem logfs, final Path path,
+                                        final String data) throws IOException {
+        LOG.info("Writing deleted instances to path {}", path);
+        OutputStream out = logfs.create(path);
+        out.write(INSTANCEPATH_FORMAT.getBytes());
+        out.write(data.getBytes());
+        out.close();
+        debug(logfs, path);
+    }
+
+    public static String[] getInstancePaths(final FileSystem fs,
+                                            final Path logFile) throws FalconException {
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        try {
+            InputStream date = fs.open(logFile);
+            IOUtils.copyBytes(date, writer, 4096, true);
+        } catch (IOException e) {
+            throw new FalconException(e);
+        }
+        String logData = writer.toString();
+        if (StringUtils.isEmpty(logData)) {
+            throw new FalconException("csv file is empty");
+        }
+
+        String[] parts = logData.split(INSTANCEPATH_FORMAT);
+        if (parts.length != 2) {
+            throw new FalconException("Instance path in csv file not in required format: " + logData);
+        }
+
+        // part[0] is instancePaths=
+        return parts[1].split(INSTANCEPATH_SEPARATOR);
+    }
+
+    private static void debug(final FileSystem fs, final Path outPath) throws IOException {
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream instance = fs.open(outPath);
+        IOUtils.copyBytes(instance, writer, 4096, true);
+        LOG.debug("Instance Paths copied to {}", outPath);
+        LOG.debug("Written {}", writer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 3f3f539..f49ada0 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,6 +23,7 @@ import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -39,6 +40,7 @@ import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.retention.EvictionHelper;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
@@ -46,6 +48,7 @@ import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -65,7 +68,8 @@ import java.util.Set;
 public class MetadataMappingServiceTest {
 
     public static final String FALCON_USER = "falcon-user";
-    private static final String LOGS_DIR = "target/log";
+    private static final String LOGS_DIR = "/falcon/staging/feed/logs";
+    private static final String LOG_FILE = "instancePaths-2014-01-01-01-00.csv";
     private static final String NOMINAL_TIME = "2014-01-01-01-00";
 
     public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -74,6 +78,7 @@ public class MetadataMappingServiceTest {
     public static final String COLO_NAME = "west-coast";
     public static final String GENERATE_WORKFLOW_NAME = "imp-click-join-workflow";
     public static final String REPLICATION_WORKFLOW_NAME = "replication-policy-workflow";
+    private static final String EVICTION_WORKFLOW_NAME = "eviction-policy-workflow";
     public static final String WORKFLOW_VERSION = "1.0.9";
 
     public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
@@ -86,6 +91,8 @@ public class MetadataMappingServiceTest {
         "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
     private static final String REPLICATED_OUTPUT_INSTANCE_PATHS =
             "jail://global:00/falcon/imp-click-join1/20140101";
+    private static final String EVICTED_OUTPUT_INSTANCE_PATHS =
+            "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
 
     public static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
 
@@ -97,6 +104,9 @@ public class MetadataMappingServiceTest {
     private List<Feed> inputFeeds = new ArrayList<Feed>();
     private List<Feed> outputFeeds = new ArrayList<Feed>();
     private Process processEntity;
+    private EmbeddedCluster embeddedCluster;
+    private String hdfsUrl;
+    private static String logFilePath;
 
 
     @BeforeClass
@@ -253,6 +263,40 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 74);
     }
 
+    @Test
+    public void   testLineageForRetention() throws Exception {
+        setupForLineageEviciton();
+        String feedName = "imp-click-join1";
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+                        feedName, "IGNORE", "IGNORE", feedName),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
+        List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
+                "clicks-feed/2014-01-01T00:00Z");
+        List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
+        verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
+        String[] paths = EVICTED_OUTPUT_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+        for (String feedInstanceDataPath : paths) {
+            verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
+                    RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
+        }
+
+        // No new vertices added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 23);
+        // +1 =  +2 for evicted-from edge from Feed Instance vertex to cluster.
+        // -1 imp-click-join1 is added twice instead of imp-click-join2 so there is one less edge as there is no
+        // classified-as -> Secure edge.
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
+    }
+
     @Test (dependsOnMethods = "testOnAdd")
     public void testOnChange() throws Exception {
         // shutdown the graph and resurrect for testing
@@ -647,11 +691,20 @@ public class MetadataMappingServiceTest {
     }
 
     private void verifyLineageGraph(String feedType) {
+        List<String> expectedFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+        List<String> secureFeeds = Arrays.asList("impression-feed/2014-01-01T00:00Z",
+                "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+        List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+        verifyLineageGraph(feedType, expectedFeeds, secureFeeds, ownedAndSecureFeeds);
+    }
+
+    private void verifyLineageGraph(String feedType, List<String> expectedFeeds,
+                                    List<String> secureFeeds, List<String> ownedAndSecureFeeds) {
         // feeds owned by a user
         List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType);
-        List<String> expected = Arrays.asList("impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z",
-                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
-        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
+        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expectedFeeds));
 
         Graph graph = service.getGraph();
 
@@ -666,14 +719,10 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(vertexById, feedInstanceVertex);
 
         // feeds classified as secure
-        verifyFeedsClassifiedAsSecure(feedType,
-                Arrays.asList("impression-feed/2014-01-01T00:00Z",
-                        "clicks-feed/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"));
+        verifyFeedsClassifiedAsSecure(feedType, secureFeeds);
 
         // feeds owned by a user and classified as secure
-        verifyFeedsOwnedByUserAndClassification(feedType, "Financial",
-                Arrays.asList("clicks-feed/2014-01-01T00:00Z",
-                        "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z"));
+        verifyFeedsOwnedByUserAndClassification(feedType, "Financial", ownedAndSecureFeeds);
     }
 
     private void verifyLineageGraphForReplicationOrEviction(String feedName, String feedInstanceDataPath,
@@ -739,7 +788,8 @@ public class MetadataMappingServiceTest {
             "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
 
             "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
-            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(),
+            (logFilePath != null ? logFilePath : LOGS_DIR + "/log" + ".txt"),
         };
     }
 
@@ -751,27 +801,30 @@ public class MetadataMappingServiceTest {
         clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
                 "classification=production");
 
+        addFeedsAndProcess(clusterEntity);
+    }
+
+    private void addFeedsAndProcess(Cluster cluster) throws Exception  {
         // Add input and output feeds
-        Feed impressionsFeed = addFeedEntity("impression-feed", clusterEntity,
+        Feed impressionsFeed = addFeedEntity("impression-feed", cluster,
                 "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         inputFeeds.add(impressionsFeed);
-        Feed clicksFeed = addFeedEntity("clicks-feed", clusterEntity,
+        Feed clicksFeed = addFeedEntity("clicks-feed", cluster,
                 "classified-as=Secure,classified-as=Financial", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/clicks-feed/${YEAR}-${MONTH}-${DAY}");
         inputFeeds.add(clicksFeed);
-        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+        Feed join1Feed = addFeedEntity("imp-click-join1", cluster,
                 "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
         outputFeeds.add(join1Feed);
-        Feed join2Feed = addFeedEntity("imp-click-join2", clusterEntity,
+        Feed join2Feed = addFeedEntity("imp-click-join2", cluster,
                 "classified-as=Secure,classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
                 "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
         outputFeeds.add(join2Feed);
         processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
                 "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
                 WORKFLOW_VERSION);
-
     }
 
     private void setupForLineageReplication() throws Exception {
@@ -785,6 +838,33 @@ public class MetadataMappingServiceTest {
         addClusterEntity(BCP_CLUSTER_ENTITY_NAME, "east-coast", "classification=bcp");
     }
 
+    private void setupForLineageEviciton() throws Exception {
+        cleanUp();
+        service.init();
+
+        // Add cluster
+        embeddedCluster = EmbeddedCluster.newCluster(CLUSTER_ENTITY_NAME, true, COLO_NAME,
+                "classification=production");
+        clusterEntity = embeddedCluster.getCluster();
+        configStore.publish(EntityType.CLUSTER, clusterEntity);
+        hdfsUrl = embeddedCluster.getConf().get("fs.default.name");
+
+        addFeedsAndProcess(clusterEntity);
+
+        // GENERATE WF should have run before this to create all instance related vertices
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME,
+                        "imp-click-join1,imp-click-join1", EVICTED_OUTPUT_INSTANCE_PATHS, null, null),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        // Write to csv file
+        String csvData = EVICTED_OUTPUT_INSTANCE_PATHS;
+        logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
+        Path path = new Path(logFilePath);
+        EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
+    }
+
     private void cleanUp() throws Exception {
         cleanupGraphStore(service.getGraph());
         cleanupConfigurationStore(configStore);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 4de7938..114071f 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
@@ -48,10 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.servlet.jsp.el.ELException;
 import javax.servlet.jsp.el.ExpressionEvaluator;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.text.DateFormat;
 import java.text.ParseException;
@@ -107,7 +103,7 @@ public class FeedEvictor extends Configured implements Tool {
     }
 
     private final Map<VARS, String> map = new TreeMap<VARS, String>();
-    private final StringBuffer instancePaths = new StringBuffer("instancePaths=");
+    private final StringBuffer instancePaths = new StringBuffer();
     private final StringBuffer buffer = new StringBuffer();
 
     @Override
@@ -129,7 +125,8 @@ public class FeedEvictor extends Configured implements Tool {
         Storage storage = FeedHelper.createStorage(feedStorageType, feedPattern);
         evict(storage, retentionLimit, timeZone);
 
-        logInstancePaths(new Path(logFile));
+        Path path = new Path(logFile);
+        EvictionHelper.logInstancePaths(path.getFileSystem(getConf()), path, instancePaths.toString());
 
         int len = buffer.length();
         if (len > 0) {
@@ -183,7 +180,7 @@ public class FeedEvictor extends Configured implements Tool {
             deleteInstance(fs, path, feedBasePath);
             Date date = getDate(path, feedPath, dateMask, timeZone);
             buffer.append(dateFormat.format(date)).append(',');
-            instancePaths.append(path).append(",");
+            instancePaths.append(path).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
         }
     }
 
@@ -197,15 +194,6 @@ public class FeedEvictor extends Configured implements Tool {
 
     }
 
-    private void logInstancePaths(Path path) throws IOException {
-        LOG.info("Writing deleted instances to path {}", path);
-        FileSystem logfs = path.getFileSystem(getConf());
-        OutputStream out = logfs.create(path);
-        out.write(instancePaths.toString().getBytes());
-        out.close();
-        debug(logfs, path);
-    }
-
     private Pair<Date, Date> getDateRange(String period) throws ELException {
         Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
                 Long.class, RESOLVER, RESOLVER);
@@ -330,14 +318,6 @@ public class FeedEvictor extends Configured implements Tool {
         deleteParentIfEmpty(fs, path.getParent(), feedBasePath);
     }
 
-    private void debug(FileSystem fs, Path outPath) throws IOException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(outPath);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        LOG.debug("Instance Paths copied to {}", outPath);
-        LOG.debug("Written {}", writer);
-    }
-
     private CommandLine getCommand(String[] args) throws org.apache.commons.cli.ParseException {
         Options options = new Options();
 
@@ -552,7 +532,7 @@ public class FeedEvictor extends Configured implements Tool {
                 String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
                 LOG.info("Deleted partition: " + partitionInfo);
                 buffer.append(partSpec).append(',');
-                instancePaths.append(partitionInfo).append(",");
+                instancePaths.append(partitionInfo).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d2e5f8c9/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index 9512fa8..29c2ec4 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -58,11 +58,11 @@ public class EmbeddedCluster {
     }
 
     public static EmbeddedCluster newCluster(final String name) throws Exception {
-        return createClusterAsUser(name, false);
+        return createClusterAsUser(name, false, null, null);
     }
 
     public static EmbeddedCluster newCluster(final String name, boolean global) throws Exception {
-        return createClusterAsUser(name, global);
+        return createClusterAsUser(name, global, null, null);
     }
 
     public static EmbeddedCluster newCluster(final String name,
@@ -71,18 +71,24 @@ public class EmbeddedCluster {
         return hdfsUser.doAs(new PrivilegedExceptionAction<EmbeddedCluster>() {
             @Override
             public EmbeddedCluster run() throws Exception {
-                return createClusterAsUser(name, false);
+                return createClusterAsUser(name, false, null, null);
             }
         });
     }
 
-    private static EmbeddedCluster createClusterAsUser(String name, boolean global) throws IOException {
+    public static EmbeddedCluster newCluster(final String name, boolean global, final String colo,
+                                             final String tags) throws Exception {
+        return createClusterAsUser(name, global, colo, tags);
+    }
+
+    private static EmbeddedCluster createClusterAsUser(String name, boolean global, String colo,
+                                                       String tags) throws IOException {
         EmbeddedCluster cluster = new EmbeddedCluster();
         cluster.conf.set("fs.default.name", "jail://" + (global ? "global" : name) + ":00");
 
         String hdfsUrl = cluster.conf.get("fs.default.name");
         LOG.info("Cluster Namenode = {}", hdfsUrl);
-        cluster.buildClusterObject(name);
+        cluster.buildClusterObject(name, colo, tags);
         return cluster;
     }
 
@@ -90,10 +96,14 @@ public class EmbeddedCluster {
         return FileSystem.get(conf);
     }
 
-    protected void buildClusterObject(String name) {
+    protected void buildClusterObject(String name, String colo, String tags) {
         clusterEntity = new Cluster();
         clusterEntity.setName(name);
-        clusterEntity.setColo("local");
+        clusterEntity.setColo((colo == null) ? "local" : colo);
+        clusterEntity.setDescription("Embedded cluster: " + name);
+        if (tags != null) {
+            clusterEntity.setTags(tags);
+        }
         clusterEntity.setDescription("Embedded cluster: " + name);
 
         Interfaces interfaces = new Interfaces();