You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:57 UTC

[3/4] git commit: FALCON-731 Lineage capture for evicted instance is broken. Contributed by Sowmya Ramesh

FALCON-731 Lineage capture for evicted instance is broken. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/00c6f1e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/00c6f1e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/00c6f1e5

Branch: refs/heads/master
Commit: 00c6f1e5d3b3a92f7efbf872d330bb88f1df552c
Parents: a94cb7a
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 15:04:48 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/falcon/entity/ClusterHelper.java |  20 +---
 .../InstanceRelationshipGraphBuilder.java       |  68 ++++++-----
 .../falcon/metadata/MetadataMappingService.java |   1 +
 .../falcon/retention/EvictedInstanceSerDe.java  | 118 +++++++++++++++++++
 .../apache/falcon/retention/EvictionHelper.java |  88 --------------
 .../falcon/workflow/WorkflowExecutionArgs.java  |   2 +-
 .../workflow/WorkflowExecutionContext.java      |   2 +-
 .../metadata/MetadataMappingServiceTest.java    |  90 +++++++-------
 .../falcon/messaging/JMSMessageProducer.java    |  20 +---
 .../apache/falcon/retention/FeedEvictor.java    |   8 +-
 11 files changed, 216 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 523b218..a2bd724 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-731 Lineage capture for evicted instance is broken
+   (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-724 Build fails as Integration test fails (Balu Vellanki via
    Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index cb3ea08..2689cb7 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,13 +18,13 @@
 
 package org.apache.falcon.entity;
 
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.*;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Location;
+import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.util.HashMap;
@@ -40,12 +40,6 @@ public final class ClusterHelper {
     private ClusterHelper() {
     }
 
-    public static FileSystem getFileSystem(String cluster) throws FalconException {
-        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-        Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
-        return HadoopClientFactory.get().createProxiedFileSystem(conf);
-    }
-
     public static Configuration getConfiguration(Cluster cluster) {
         Configuration conf = new Configuration();
 
@@ -116,10 +110,6 @@ public final class ClusterHelper {
         return normalizedPath.substring(0, normalizedPath.length() - 1);
     }
 
-    public static String getCompleteLocation(Cluster cluster, String locationKey) {
-        return getStorageUrl(cluster) + "/" + getLocation(cluster, locationKey);
-    }
-
     public static String getLocation(Cluster cluster, String locationKey) {
         for (Location loc : cluster.getLocations().getLocations()) {
             if (loc.getName().equals(locationKey)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 4d9fbcf..5b5d62c 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Vertex;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.common.FeedDataPath;
@@ -34,12 +33,10 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 
@@ -51,6 +48,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
 
     private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed
+    private static final String IGNORE = "IGNORE";
 
     // process workflow properties from message
     private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {
@@ -207,39 +205,33 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
-        String outputFeedNamesArg = context.getOutputFeedNames();
-        if ("NONE".equals(outputFeedNamesArg)) {
-            LOG.info("There are no output feeds for this process, return");
+        final String outputFeedPaths = context.getOutputFeedInstancePaths();
+        if (IGNORE.equals(outputFeedPaths)) {
+            LOG.info("There were no evicted instances, nothing to record");
             return;
         }
 
-        String logFile = context.getLogFile();
-        if (StringUtils.isEmpty(logFile)){
-            throw new IllegalArgumentException("csv log file path empty");
-        }
-
+        LOG.info("Recording lineage for evicted instances {}", outputFeedPaths);
+        // For retention there will be only one output feed name
+        String feedName = context.getOutputFeedNames();
+        String[] evictedFeedInstancePathList = context.getOutputFeedInstancePathsList();
         String clusterName = context.getClusterName();
-        String[] paths = EvictionHelper.getInstancePaths(
-                ClusterHelper.getFileSystem(clusterName), new Path(logFile));
-        if (paths == null || paths.length <= 0) {
-            throw new IllegalArgumentException("No instance paths in log file");
-        }
 
-        // For retention there will be only one output feed name
-        String feedName = outputFeedNamesArg;
-        for (String feedInstanceDataPath : paths) {
-            LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                    + feedInstanceDataPath + ", in cluster: " + clusterName);
-            RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+        for (String evictedFeedInstancePath : evictedFeedInstancePathList) {
+            LOG.info("Computing feed instance for : name= {}, path={}, in cluster: {}",
+                    feedName, evictedFeedInstancePath, clusterName);
             String feedInstanceName = getFeedInstanceName(feedName, clusterName,
-                    feedInstanceDataPath, context.getNominalTimeAsISO8601());
-            Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+                    evictedFeedInstancePath, context.getNominalTimeAsISO8601());
+            Vertex feedInstanceVertex = findVertex(feedInstanceName,
+                    RelationshipType.FEED_INSTANCE);
 
             LOG.info("Vertex exists? name={}, type={}, v={}",
-                    feedInstanceName, vertexType, feedInstanceVertex);
-            if (feedInstanceVertex == null) {
-                throw new IllegalStateException(vertexType
-                        + " instance vertex must exist " + feedInstanceName);
+                    feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+            if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+                LOG.info("{} instance vertex {} does not exist, add it",
+                        RelationshipType.FEED_INSTANCE, feedInstanceName);
+                feedInstanceVertex = addFeedInstance(// add a new instance
+                        feedInstanceName, context, feedName, clusterName);
             }
 
             addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
@@ -251,16 +243,20 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                                  WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {
         String clusterName = context.getClusterName();
-        LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                + feedInstanceDataPath + ", in cluster: " + clusterName);
+        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+                feedInstanceDataPath, clusterName);
         String feedInstanceName = getFeedInstanceName(feedName, clusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        LOG.info("Adding feed instance: " + feedInstanceName);
+        Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName);
+        addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
+    }
+
+    private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context,
+                                   String feedName, String clusterName) throws FalconException {
+        LOG.info("Adding feed instance {}", feedInstanceName);
         Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
                 context.getTimeStampAsISO8601());
 
-        addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
-
         addInstanceToEntity(feedInstance, feedName,
                 RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
         addInstanceToEntity(feedInstance, clusterName,
@@ -273,6 +269,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             addDataClassification(feed.getTags(), feedInstance);
             addGroups(feed.getGroups(), feedInstance);
         }
+
+        return feedInstance;
     }
 
     public static String getFeedInstanceName(String feedName, String clusterName,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index f607e0a..46f8a61 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -271,6 +271,7 @@ public class MetadataMappingService
 
         case DELETE:
             onFeedInstanceEvicted(context);
+            getTransactionalGraph().commit();
             break;
 
         default:

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
new file mode 100644
index 0000000..c2f222b
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.retention;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Utility class for serializing and deserializing the evicted instance paths.
+ */
+
+public final class EvictedInstanceSerDe {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EvictedInstanceSerDe.class);
+
+    private static final String INSTANCEPATH_PREFIX = "instancePaths=";
+    private static final String INSTANCES_SEPARATOR = "=";
+    public static final String INSTANCEPATH_SEPARATOR = ",";
+
+
+    private EvictedInstanceSerDe() {}
+
+    /**
+     * This method serializes the evicted instances to a file in logs dir for a given feed.
+     * @see org.apache.falcon.retention.FeedEvictor
+     *
+     * *Note:* This is executed with in the map task for evictor action
+     *
+     * @param fileSystem file system handle
+     * @param logFilePath       File path to serialize the instances to
+     * @param instances  list of instances, comma separated
+     * @throws IOException
+     */
+    public static void serializeEvictedInstancePaths(final FileSystem fileSystem,
+                                                     final Path logFilePath,
+                                                     StringBuffer instances) throws IOException {
+        LOG.info("Writing deleted instances {} to path {}", instances, logFilePath);
+        OutputStream out = null;
+        try {
+            out = fileSystem.create(logFilePath);
+            instances.insert(0, INSTANCEPATH_PREFIX); // add the prefix
+            out.write(instances.toString().getBytes());
+        } finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            logEvictedInstancePaths(fileSystem, logFilePath);
+        }
+    }
+
+    private static void logEvictedInstancePaths(final FileSystem fs,
+                                                final Path outPath) throws IOException {
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream instance = fs.open(outPath);
+        IOUtils.copyBytes(instance, writer, 4096, true);
+        LOG.debug("Instance Paths copied to {}", outPath);
+        LOG.debug("Written {}", writer);
+    }
+
+    /**
+     * This method deserializes the evicted instances from a log file on hdfs.
+     * @see org.apache.falcon.messaging.JMSMessageProducer
+     * *Note:* This is executed with in the falcon server
+     *
+     * @param fileSystem file system handle
+     * @param logFile    File path to serialize the instances to
+     * @return list of instances, comma separated
+     * @throws IOException
+     */
+    public static String[] deserializeEvictedInstancePaths(final FileSystem fileSystem,
+                                                           final Path logFile) throws IOException {
+        try {
+            ByteArrayOutputStream writer = new ByteArrayOutputStream();
+            InputStream instance = fileSystem.open(logFile);
+            IOUtils.copyBytes(instance, writer, 4096, true);
+            String[] instancePaths = writer.toString().split(INSTANCES_SEPARATOR);
+
+            LOG.info("Deleted feed instance paths file:" + logFile);
+            if (instancePaths.length == 1) {
+                LOG.debug("Returning 0 instance paths for feed ");
+                return new String[0];
+            } else {
+                LOG.debug("Returning instance paths for feed " + instancePaths[1]);
+                return instancePaths[1].split(INSTANCEPATH_SEPARATOR);
+            }
+        } finally {
+            // clean up the serialized state
+            fileSystem.delete(logFile, true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
deleted file mode 100644
index 5d6481c..0000000
--- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.retention;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Helper methods to facilitate eviction.
- */
-
-public final class EvictionHelper {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
-
-    private static final String INSTANCEPATH_FORMAT = "instancePaths=";
-    public static final String INSTANCEPATH_SEPARATOR = ",";
-
-
-    private EvictionHelper() {}
-
-    public static void logInstancePaths(final FileSystem logfs, final Path path,
-                                        final String data) throws IOException {
-        LOG.info("Writing deleted instances to path {}", path);
-        OutputStream out = logfs.create(path);
-        out.write(INSTANCEPATH_FORMAT.getBytes());
-        out.write(data.getBytes());
-        out.close();
-        debug(logfs, path);
-    }
-
-    public static String[] getInstancePaths(final FileSystem fs,
-                                            final Path logFile) throws FalconException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        try {
-            InputStream date = fs.open(logFile);
-            IOUtils.copyBytes(date, writer, 4096, true);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-        String logData = writer.toString();
-        if (StringUtils.isEmpty(logData)) {
-            throw new FalconException("csv file is empty");
-        }
-
-        String[] parts = logData.split(INSTANCEPATH_FORMAT);
-        if (parts.length != 2) {
-            throw new FalconException("Instance path in csv file not in required format: " + logData);
-        }
-
-        // part[0] is instancePaths=
-        return parts[1].split(INSTANCEPATH_SEPARATOR);
-    }
-
-    private static void debug(final FileSystem fs, final Path outPath) throws IOException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(outPath);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        LOG.debug("Instance Paths copied to {}", outPath);
-        LOG.debug("Written {}", writer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 514bafe..0a8be64 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -75,7 +75,7 @@ public enum WorkflowExecutionArgs {
     BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
 
     // state maintained
-    LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
+    LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false),
     // execution context data recorded
     LOG_DIR("logDir", "log dir where lineage can be recorded"),
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 04ef037..ef55ba9 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -87,7 +87,7 @@ public class WorkflowExecutionContext {
         WorkflowExecutionArgs.RUN_ID,
         WorkflowExecutionArgs.STATUS,
         WorkflowExecutionArgs.TIMESTAMP,
-        WorkflowExecutionArgs.LOG_FILE,
+        WorkflowExecutionArgs.LOG_DIR,
     };
 
     private final Map<WorkflowExecutionArgs, String> context;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 3b9fdba..895a5f7 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -41,7 +40,7 @@ import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
+import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
@@ -49,7 +48,6 @@ import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -70,7 +68,6 @@ public class MetadataMappingServiceTest {
 
     public static final String FALCON_USER = "falcon-user";
     private static final String LOGS_DIR = "/falcon/staging/feed/logs";
-    private static final String LOG_FILE = "instancePaths-2014-01-01-01-00.csv";
     private static final String NOMINAL_TIME = "2014-01-01-01-00";
 
     public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -93,7 +90,8 @@ public class MetadataMappingServiceTest {
     public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
     public static final String OUTPUT_INSTANCE_PATHS =
         "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
-    private static final String REPLICATED_INSTANCE = "raw-click";
+    private static final String REPLICATED_FEED = "raw-click";
+    private static final String EVICTED_FEED = "imp-click-join1";
     private static final String EVICTED_INSTANCE_PATHS =
             "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
     public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
@@ -109,10 +107,6 @@ public class MetadataMappingServiceTest {
     private List<Feed> inputFeeds = new ArrayList<Feed>();
     private List<Feed> outputFeeds = new ArrayList<Feed>();
     private Process processEntity;
-    private EmbeddedCluster embeddedCluster;
-    private String hdfsUrl;
-    private static String logFilePath;
-
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -121,6 +115,8 @@ public class MetadataMappingServiceTest {
         configStore = ConfigurationStore.get();
 
         Services.get().register(new WorkflowJobEndNotificationService());
+        StartupProperties.get().setProperty("falcon.graph.storage.directory",
+                "target/graphdb-" + System.currentTimeMillis());
         StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
         service = new MetadataMappingService();
         service.init();
@@ -258,7 +254,8 @@ public class MetadataMappingServiceTest {
         GraphUtils.dump(service.getGraph());
 
         // Verify if instance name has nominal time
-        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(RelationshipType.FEED_INSTANCE.getName());
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(
+                RelationshipType.FEED_INSTANCE.getName());
         List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
                 "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
         Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
@@ -270,20 +267,20 @@ public class MetadataMappingServiceTest {
     }
 
     @Test
-    public void  testLineageForReplication() throws Exception {
+    public void testLineageForReplication() throws Exception {
         setupForLineageReplication();
 
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_INSTANCE,
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
                         "jail://global:00/falcon/raw-click/bcp/20140101",
-                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_INSTANCE),
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
 
         debug(service.getGraph());
         GraphUtils.dump(service.getGraph());
 
-        verifyLineageGraphForReplicationOrEviction(REPLICATED_INSTANCE,
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
                 "jail://global:00/falcon/raw-click/bcp/20140101", context,
                 RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
 
@@ -303,12 +300,11 @@ public class MetadataMappingServiceTest {
     }
 
     @Test
-    public void   testLineageForRetention() throws Exception {
-        setupForLineageEviciton();
-        String feedName = "imp-click-join1";
+    public void testLineageForRetention() throws Exception {
+        setupForLineageEviction();
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                         EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
-                        feedName, "IGNORE", "IGNORE", feedName),
+                        EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
 
         service.onSuccess(context);
@@ -322,9 +318,9 @@ public class MetadataMappingServiceTest {
         List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
                 "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
         verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
-        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
         for (String feedInstanceDataPath : paths) {
-            verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
+            verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context,
                     RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
         }
 
@@ -336,6 +332,27 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
     }
 
+    @Test
+    public void testLineageForRetentionWithNoFeedsEvicted() throws Exception {
+        cleanUp();
+        service.init();
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+                        EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        // No new vertices added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount);
+        // No new edges added
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount);
+    }
+
     @Test (dependsOnMethods = "testOnAdd")
     public void testOnChange() throws Exception {
         // shutdown the graph and resurrect for testing
@@ -673,7 +690,9 @@ public class MetadataMappingServiceTest {
         // feeds owned by a user
         List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName());
         Assert.assertEquals(feedNamesOwnedByUser,
-                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1", "imp-click-join2"));
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1",
+                        "imp-click-join2")
+        );
 
         // feeds classified as secure
         verifyFeedsClassifiedAsSecure(feedType.getName(),
@@ -736,7 +755,8 @@ public class MetadataMappingServiceTest {
                 }
             }
         }
-        Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
+        Assert.assertTrue(actual.containsAll(expected),
+                "Actual does not contain expected: " + actual);
     }
 
     public long getVerticesCount(final Graph graph) {
@@ -855,8 +875,6 @@ public class MetadataMappingServiceTest {
             "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
 
             "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
-            "-" + WorkflowExecutionArgs.LOG_FILE.getName(),
-            (logFilePath != null ? logFilePath : LOGS_DIR + "/log" + ".txt"),
         };
     }
 
@@ -907,7 +925,7 @@ public class MetadataMappingServiceTest {
         Cluster[] clusters = {clusterEntity, bcpCluster};
 
         // Add feed
-        Feed rawFeed = addFeedEntity(REPLICATED_INSTANCE, clusters,
+        Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters,
                 "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
         // Add uri template for each cluster
@@ -945,22 +963,12 @@ public class MetadataMappingServiceTest {
                 EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
                 "jail://global:00/falcon/imp-click-join1/20140101",
                 "jail://global:00/falcon/raw-click/primary/20140101",
-                REPLICATED_INSTANCE), WorkflowExecutionContext.Type.POST_PROCESSING);
+                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
     }
 
-    private void setupForLineageEviciton() throws Exception {
-        cleanUp();
-        service.init();
-
-        // Add cluster
-        embeddedCluster = EmbeddedCluster.newCluster(CLUSTER_ENTITY_NAME, true, COLO_NAME,
-                "classification=production");
-        clusterEntity = embeddedCluster.getCluster();
-        configStore.publish(EntityType.CLUSTER, clusterEntity);
-        hdfsUrl = embeddedCluster.getConf().get("fs.default.name");
-
-        addFeedsAndProcess(clusterEntity);
+    private void setupForLineageEviction() throws Exception {
+        setup();
 
         // GENERATE WF should have run before this to create all instance related vertices
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -968,12 +976,6 @@ public class MetadataMappingServiceTest {
                         "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
-
-        // Write to csv file
-        String csvData = EVICTED_INSTANCE_PATHS;
-        logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
-        Path path = new Path(logFilePath);
-        EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
     }
 
     private void setupForNoDateInFeedPath() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 0181e74..a60e951 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,12 +18,12 @@
 
 package org.apache.falcon.messaging;
 
+import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,9 +35,7 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.Topic;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -290,19 +288,7 @@ public class JMSMessageProducer {
             return new String[0];
         }
 
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(logFile);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        String[] instancePaths = writer.toString().split("=");
-        fs.delete(logFile, true);
-        LOG.info("Deleted feed instance paths file:" + logFile);
-        if (instancePaths.length == 1) {
-            LOG.debug("Returning 0 instance paths for feed ");
-            return new String[0];
-        } else {
-            LOG.debug("Returning instance paths for feed " + instancePaths[1]);
-            return instancePaths[1].split(",");
-        }
+        return EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, logFile);
     }
 
     private Map<String, String> buildMessage(final WorkflowExecutionArgs[] filter) {
@@ -311,6 +297,8 @@ public class JMSMessageProducer {
             message.put(arg.getName(), context.getValue(arg));
         }
 
+        // this is NOT useful since the file is deleted after message is sent
+        message.remove(WorkflowExecutionArgs.LOG_FILE.getName());
         return message;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 114071f..9589edf 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -126,7 +126,8 @@ public class FeedEvictor extends Configured implements Tool {
         evict(storage, retentionLimit, timeZone);
 
         Path path = new Path(logFile);
-        EvictionHelper.logInstancePaths(path.getFileSystem(getConf()), path, instancePaths.toString());
+        EvictedInstanceSerDe.serializeEvictedInstancePaths(
+                path.getFileSystem(getConf()), path, instancePaths);
 
         int len = buffer.length();
         if (len > 0) {
@@ -180,7 +181,7 @@ public class FeedEvictor extends Configured implements Tool {
             deleteInstance(fs, path, feedBasePath);
             Date date = getDate(path, feedPath, dateMask, timeZone);
             buffer.append(dateFormat.format(date)).append(',');
-            instancePaths.append(path).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+            instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
         }
     }
 
@@ -532,7 +533,7 @@ public class FeedEvictor extends Configured implements Tool {
                 String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
                 LOG.info("Deleted partition: " + partitionInfo);
                 buffer.append(partSpec).append(',');
-                instancePaths.append(partitionInfo).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+                instancePaths.append(partitionInfo).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
             }
         }
     }
@@ -553,5 +554,4 @@ public class FeedEvictor extends Configured implements Tool {
             }
         }
     }
-
 }