You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/10/02 02:51:56 UTC

[4/5] git commit: FALCON-760 Messaging is broken for FALCON.ENTITY.TOPIC in case of Eviction. Contributed by Sowmya Ramesh

FALCON-760 Messaging is broken for FALCON.ENTITY.TOPIC in case of Eviction. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/88d92488
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/88d92488
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/88d92488

Branch: refs/heads/master
Commit: 88d9248864a202ed71db8b64b047fe000606bd05
Parents: 3f55972
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Oct 1 16:34:54 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Oct 1 17:42:21 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../falcon/retention/EvictedInstanceSerDe.java  | 35 ++++---
 .../retention/EvictedInstanceSerDeTest.java     | 98 ++++++++++++++++++++
 .../falcon/messaging/JMSMessageProducer.java    | 27 +-----
 .../messaging/JMSMessageProducerTest.java       |  8 +-
 .../resources/action/feed/eviction-action.xml   |  2 +-
 .../src/main/resources/action/post-process.xml  |  2 +-
 .../workflow/FalconPostProcessingTest.java      | 14 +--
 .../apache/falcon/retention/FeedEvictor.java    | 27 +++++-
 .../falcon/retention/FeedEvictorTest.java       |  6 +-
 .../lifecycle/TableStorageFeedEvictorIT.java    |  8 +-
 11 files changed, 162 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4338337..87f657b 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -101,6 +101,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-760 Messaging is broken for FALCON.ENTITY.TOPIC in case of Eviction
+   (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-768 Change dashboard USER_ID to falcon-dashboard (Balu Vellanki via
    Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
index c2f222b..b4d46c4 100644
--- a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
+++ b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
@@ -20,6 +20,8 @@ package org.apache.falcon.retention;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +39,7 @@ public final class EvictedInstanceSerDe {
 
     private static final Logger LOG = LoggerFactory.getLogger(EvictedInstanceSerDe.class);
 
-    private static final String INSTANCEPATH_PREFIX = "instancePaths=";
-    private static final String INSTANCES_SEPARATOR = "=";
+    public static final String INSTANCEPATH_PREFIX = "instancePaths=";
     public static final String INSTANCEPATH_SEPARATOR = ",";
 
 
@@ -64,6 +65,10 @@ public final class EvictedInstanceSerDe {
             out = fileSystem.create(logFilePath);
             instances.insert(0, INSTANCEPATH_PREFIX); // add the prefix
             out.write(instances.toString().getBytes());
+
+            // To make sure log cleaning service can delete this file
+            FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL);
+            fileSystem.setPermission(logFilePath, permission);
         } finally {
             if (out != null) {
                 out.close();
@@ -96,23 +101,17 @@ public final class EvictedInstanceSerDe {
      */
     public static String[] deserializeEvictedInstancePaths(final FileSystem fileSystem,
                                                            final Path logFile) throws IOException {
-        try {
-            ByteArrayOutputStream writer = new ByteArrayOutputStream();
-            InputStream instance = fileSystem.open(logFile);
-            IOUtils.copyBytes(instance, writer, 4096, true);
-            String[] instancePaths = writer.toString().split(INSTANCES_SEPARATOR);
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream instance = fileSystem.open(logFile);
+        IOUtils.copyBytes(instance, writer, 4096, true);
+        String[] instancePaths = writer.toString().split(INSTANCEPATH_PREFIX);
 
-            LOG.info("Deleted feed instance paths file:" + logFile);
-            if (instancePaths.length == 1) {
-                LOG.debug("Returning 0 instance paths for feed ");
-                return new String[0];
-            } else {
-                LOG.debug("Returning instance paths for feed " + instancePaths[1]);
-                return instancePaths[1].split(INSTANCEPATH_SEPARATOR);
-            }
-        } finally {
-            // clean up the serialized state
-            fileSystem.delete(logFile, true);
+        if (instancePaths.length <= 1) {
+            LOG.info("Returning 0 instance paths for feed ");
+            return new String[0];
+        } else {
+            LOG.info("Returning instance paths for feed {}", instancePaths[1]);
+            return instancePaths[1].split(INSTANCEPATH_SEPARATOR);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java b/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java
new file mode 100644
index 0000000..4fa38eb
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/retention/EvictedInstanceSerDeTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.retention;
+
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Unit test for EvictedInstanceSerDe.
+ */
+public class EvictedInstanceSerDeTest {
+
+    private EmbeddedCluster cluster;
+    private FileSystem fs;
+    private Path csvFilePath;
+    private StringBuffer evictedInstancePaths = new StringBuffer(
+            "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2010")
+            .append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR)
+            .append("thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2011");
+
+    @BeforeClass
+    public void start() throws Exception {
+        cluster = EmbeddedCluster.newCluster("test");
+        String hdfsUrl = cluster.getConf().get("fs.default.name");
+
+        fs = FileSystem.get(cluster.getConf());
+        csvFilePath = new Path(hdfsUrl + "/falcon/staging/feed/instancePaths-2014-10-01-01-00.csv");
+    }
+
+    @AfterClass
+    public void close() throws Exception {
+        cluster.shutdown();
+    }
+
+    @Test
+    public void testSerializeEvictedInstancePathsForNoEviction() throws Exception {
+        EvictedInstanceSerDe.serializeEvictedInstancePaths(fs, csvFilePath, new StringBuffer());
+
+        Assert.assertEquals(readLogFile(csvFilePath),
+                EvictedInstanceSerDe.INSTANCEPATH_PREFIX);
+    }
+
+    @Test
+    public void testSerializeEvictedInstancePathsWithEviction() throws Exception {
+        EvictedInstanceSerDe.serializeEvictedInstancePaths(fs, csvFilePath, evictedInstancePaths);
+        Assert.assertEquals(readLogFile(csvFilePath), evictedInstancePaths.toString());
+    }
+
+    @Test(dependsOnMethods = "testSerializeEvictedInstancePathsForNoEviction")
+    public void testDeserializeEvictedInstancePathsForNoEviction() throws Exception {
+        String[] instancePaths = EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, csvFilePath);
+        Assert.assertEquals(instancePaths.length, 0);
+    }
+
+    @Test(dependsOnMethods = "testSerializeEvictedInstancePathsWithEviction")
+    public void testDeserializeEvictedInstancePathsWithEviction() throws Exception {
+        String[] instancePaths = EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, csvFilePath);
+        Assert.assertEquals(instancePaths.length, 2);
+        Assert.assertTrue(instancePaths[0].equals(
+                "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2010"));
+        Assert.assertTrue(instancePaths[1].equals(
+                "thrift://falcon-distcp-1.cs1cloud.internal:9083/default/retention_hours_7/year=2011"));
+
+    }
+
+    private String readLogFile(Path logFile) throws IOException {
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream date = fs.open(logFile);
+        IOUtils.copyBytes(date, writer, 4096, true);
+        return writer.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index a60e951..629e6a5 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -68,17 +68,13 @@ public class JMSMessageProducer {
         this.messageType = messageType;
     }
 
-    public boolean isFalconEntityTopic() {
-        return messageType == MessageType.FALCON;
-    }
-
     // convention over configuration
     public String getTopicName() {
         String topicNameValue = context.getValue(WorkflowExecutionArgs.TOPIC_NAME);
         return topicNameValue != null
                 ? topicNameValue  // return if user has set a topic
                 : FALCON_TOPIC_PREFIX // else falcon entity topic or user = FALCON.$entity_name
-                        + (messageType == MessageType.FALCON ? ENTITY_TOPIC_NAME : context.getEntityName());
+                + (messageType == MessageType.FALCON ? ENTITY_TOPIC_NAME : context.getEntityName());
     }
 
     public String getBrokerImplClass() {
@@ -180,7 +176,7 @@ public class JMSMessageProducer {
     }
 
     private List<Map<String, String>> buildMessageList(WorkflowExecutionArgs[] filteredArgs) {
-        String[] feedNames = getFeedNames();
+        String[] feedNames = context.getOutputFeedNamesList();
         if (feedNames == null) {
             return Collections.emptyList();
         }
@@ -252,26 +248,7 @@ public class JMSMessageProducer {
         message.put(key.getName(), value);
     }
 
-    private String[] getFeedNames() {
-        String feedNameStr = context.getOutputFeedNames();
-        if (isFalconEntityTopic()) {
-            return new String[]{feedNameStr};
-        }
-
-        if (feedNameStr.equals("null")) {
-            return null;
-        }
-
-        return context.getOutputFeedNamesList();
-    }
-
     private String[] getFeedPaths() throws IOException {
-
-        if (isFalconEntityTopic()) {
-            LOG.debug("Returning instance paths for Falcon Topic: " + context.getOutputFeedInstancePaths());
-            return new String[]{context.getOutputFeedInstancePaths(), };
-        }
-
         WorkflowExecutionContext.EntityOperations operation = context.getOperation();
         if (operation == WorkflowExecutionContext.EntityOperations.GENERATE
                 || operation == WorkflowExecutionContext.EntityOperations.REPLICATE) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
index bf8615f..490292f 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -70,6 +70,8 @@ public class JMSMessageProducerTest {
     @Test
     public void testWithFeedOutputPaths() throws Exception {
         List<String> args = createCommonArgs();
+        String[] outputFeedNames = {"click-logs", "raw-logs", };
+        String[] outputFeeedPaths = {"/click-logs/10/05/05/00/20", "/raw-logs/10/05/05/00/20", };
         List<String> newArgs = new ArrayList<String>(Arrays.asList(
                 "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), "agg-coord",
                 "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), "click-logs,raw-logs",
@@ -79,13 +81,15 @@ public class JMSMessageProducerTest {
         List<String[]> messages = new ArrayList<String[]>();
         messages.add(args.toArray(new String[args.size()]));
         testProcessMessageCreator(messages, TOPIC_NAME);
+        int index = 0;
         for (MapMessage m : mapMessages) {
             assertMessage(m);
             Assert.assertTrue((m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName())
-                    .equals("click-logs,raw-logs")));
+                    .equals(outputFeedNames[index])));
             Assert.assertTrue(m
                     .getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName())
-                    .equals("/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20"));
+                    .equals(outputFeeedPaths[index]));
+            ++index;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/oozie/src/main/resources/action/feed/eviction-action.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/feed/eviction-action.xml b/oozie/src/main/resources/action/feed/eviction-action.xml
index 6d03eb0..64d5793 100644
--- a/oozie/src/main/resources/action/feed/eviction-action.xml
+++ b/oozie/src/main/resources/action/feed/eviction-action.xml
@@ -48,7 +48,7 @@
         <arg>-frequency</arg>
         <arg>${frequency}</arg>
         <arg>-logFile</arg>
-        <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+        <arg>${logDir}/job-${nominalTime}/${wf:run()}/evicted-instancePaths.csv</arg>
     </java>
     <ok to="succeeded-post-processing"/>
     <error to="failed-post-processing"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index 979d4f0..440a131 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -63,7 +63,7 @@
         <arg>-feedInstancePaths</arg>
         <arg>${feedInstancePaths}</arg>
         <arg>-logFile</arg>
-        <arg>${logDir}/instancePaths-${nominalTime}.csv</arg>
+        <arg>${logDir}/job-${nominalTime}/${wf:run()}/evicted-instancePaths.csv</arg>
         <arg>-workflowEngineUrl</arg>
         <arg>${workflowEngineUrl}</arg>
         <arg>-subflowId</arg>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 87d5e8a..2787d7f 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -130,17 +130,9 @@ public class FalconPostProcessingTest {
         System.out.println("Consumed: " + m.toString());
 
         assertMessage(m);
-        if (topic.equals(FALCON_TOPIC_NAME)) {
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()),
-                    "out-click-logs,out-raw-logs");
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                    "/out-click-logs/10/05/05/00/20,/out-raw-logs/10/05/05/00/20");
-        } else {
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "out-click-logs");
-            Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
-                    "/out-click-logs/10/05/05/00/20");
-        }
-
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "out-click-logs");
+        Assert.assertEquals(m.getString(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName()),
+                "/out-click-logs/10/05/05/00/20");
         connection.close();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 9589edf..f7a4493 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -179,7 +179,7 @@ public class FeedEvictor extends Configured implements Tool {
         Path feedBasePath = getFeedBasePath(feedPath);
         for (Path path : toBeDeleted) {
             deleteInstance(fs, path, feedBasePath);
-            Date date = getDate(path, feedPath, dateMask, timeZone);
+            Date date = getDate(new Path(path.toUri().getPath()), feedPath, dateMask, timeZone);
             buffer.append(dateFormat.format(date)).append(',');
             instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
         }
@@ -218,7 +218,7 @@ public class FeedEvictor extends Configured implements Tool {
             LOG.debug("Considering {}", file.getPath().toUri().getPath());
             LOG.debug("Date: {}", date);
             if (date != null && !isDateInRange(date, start)) {
-                toBeDeleted.add(new Path(file.getPath().toUri().getPath()));
+                toBeDeleted.add(file.getPath());
             }
         }
         return toBeDeleted;
@@ -533,11 +533,32 @@ public class FeedEvictor extends Configured implements Tool {
                 String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
                 LOG.info("Deleted partition: " + partitionInfo);
                 buffer.append(partSpec).append(',');
-                instancePaths.append(partitionInfo).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
+                instancePaths.append(getEvictedPartitionPath(storage, partitionToDrop))
+                        .append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
             }
         }
     }
 
+    private static String getEvictedPartitionPath(final CatalogStorage storage,
+                                                  final CatalogPartition partitionToDrop) {
+        String uriTemplate = storage.getUriTemplate(); // no need for location type for table
+        List<String> values = partitionToDrop.getValues();
+        StringBuilder partitionPath = new StringBuilder();
+        int index = 0;
+        for (String partitionKey : storage.getDatedPartitionKeys()) {
+            String dateMask = storage.getPartitionValue(partitionKey);
+            String date = values.get(index);
+
+            partitionPath.append(uriTemplate.replace(dateMask, date));
+            partitionPath.append(CatalogStorage.PARTITION_SEPARATOR);
+            LOG.info("partitionPath: " + partitionPath);
+        }
+        partitionPath.setLength(partitionPath.length() - 1);
+
+        LOG.info("Return partitionPath: " + partitionPath);
+        return partitionPath.toString();
+    }
+
     private void deleteParentIfEmpty(FileSystem fs, Path parent, Path feedBasePath) throws IOException {
         if (feedBasePath.equals(parent)) {
             LOG.info("Not deleting feed base path: {}", parent);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
----------------------------------------------------------------------
diff --git a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
index eb4173e..b2f4821 100644
--- a/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
+++ b/retention/src/test/java/org/apache/falcon/retention/FeedEvictorTest.java
@@ -122,7 +122,7 @@ public class FeedEvictorTest {
             assertFailures(fs, pair);
             compare(map.get("feed1"), stream.getBuffer());
 
-            String expectedInstancePaths = getExpectedInstancePaths(dataPath.replaceAll(storageUrl, ""));
+            String expectedInstancePaths = getExpectedInstancePaths(dataPath);
             Assert.assertEquals(readLogFile(new Path(logFile)), expectedInstancePaths);
 
             String deletedPath = expectedInstancePaths.split(",")[0].split("=")[1];
@@ -222,7 +222,7 @@ public class FeedEvictorTest {
             compare(map.get("feed2"), stream.getBuffer());
 
             Assert.assertEquals(readLogFile(new Path(logFile)),
-                    getExpectedInstancePaths(dataPath.replaceAll(storageUrl, "")));
+                    getExpectedInstancePaths(dataPath));
 
         } catch (Exception e) {
             Assert.fail("Unknown exception", e);
@@ -356,7 +356,7 @@ public class FeedEvictorTest {
             assertFailures(fs, pair);
 
             Assert.assertEquals(readLogFile(new Path(logFile)),
-                    getExpectedInstancePaths(dataPath.replaceAll(storageUrl, "")));
+                    getExpectedInstancePaths(dataPath));
 
         } catch (Exception e) {
             Assert.fail("Unknown exception", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/88d92488/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 894a194..de6f782 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -545,10 +545,10 @@ public class TableStorageFeedEvictorIT {
 
     // instance paths could be deleted in any order; compare the list of evicted paths
     private void validateInstancePaths(String actualInstancesEvicted, String expectedInstancePaths) {
-        String[] actualEvictedPathStr = actualInstancesEvicted.split("=");
-        String[] expectedEvictedPathStr = expectedInstancePaths.split("=");
-        if (actualEvictedPathStr.length == 1) {
-            Assert.assertEquals(expectedEvictedPathStr.length, 1);
+        String[] actualEvictedPathStr = actualInstancesEvicted.split("instancePaths=");
+        String[] expectedEvictedPathStr = expectedInstancePaths.split("instancePaths=");
+        if (actualEvictedPathStr.length == 0) {
+            Assert.assertEquals(expectedEvictedPathStr.length, 0);
         } else {
             Assert.assertEquals(actualEvictedPathStr.length, 2);
             Assert.assertEquals(expectedEvictedPathStr.length, 2);