You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/08 16:40:21 UTC

[05/15] falcon git commit: FALCON-1321 Add Entity Lineage Test. Contributed by Pragya Mittal.

FALCON-1321 Add Entity Lineage Test. Contributed by Pragya Mittal.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/63cf451a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/63cf451a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/63cf451a

Branch: refs/heads/0.7
Commit: 63cf451a7fe990d208defd03859c8083c8ee1168
Parents: 8de399c
Author: Ajay Yadava <aj...@gmail.com>
Authored: Mon Aug 3 15:20:14 2015 +0530
Committer: Ajay Yadav <aj...@inmobi.com>
Committed: Sat Aug 8 20:06:40 2015 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../regression/Entities/ProcessMerlin.java      |  48 ++++
 .../helpers/entity/AbstractEntityHelper.java    |  17 ++
 .../core/response/ServiceResponse.java          |  12 +
 .../regression/core/util/EntityLineageUtil.java |  65 ++++++
 .../falcon/regression/core/util/Util.java       |  21 ++
 .../org/apache/falcon/request/RequestKeys.java  |   1 +
 .../regression/lineage/EntityLineageTest.java   | 221 +++++++++++++++++++
 8 files changed, 387 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 73ce75b..1a300d5 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1321 Add Entity Lineage Test (Pragya Mittal via Ajay Yadava)
+
    FALCON-1319 Contribute HiveDr, Mirror tests and some test fixes (Namit Maheshwari, Paul Isaychuk,
    Raghav Kumar Gautam & Ruslan Ostafiychuk via Raghav Kumar Gautam)
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
index ae5c70c..b905bee 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java
@@ -626,6 +626,54 @@ public class ProcessMerlin extends Process {
         draft.setWorkflow(null, null, null);
         return draft;
     }
+
+    /**
+     * Replaces old input by new input.
+     */
+    public void resetInputFeed(String inputName, String feedName) {
+        Input in1 = getInputs().getInputs().get(0);
+        getInputs().getInputs().clear();
+        Input in2 = new Input();
+        in2.setEnd(in1.getEnd());
+        in2.setFeed(feedName);
+        in2.setName(inputName);
+        in2.setPartition(in1.getPartition());
+        in2.setStart(in1.getStart());
+        in2.setOptional(in1.isOptional());
+        getInputs().getInputs().add(in2);
+    }
+
+    /**
+     * Replaces old output by new output.
+     */
+    public void resetOutputFeed(String outputName, String feedName) {
+        Output out1 = getOutputs().getOutputs().get(0);
+        getOutputs().getOutputs().clear();
+        Output out2 = new Output();
+        out2.setFeed(feedName);
+        out2.setName(outputName);
+        out2.setInstance(out1.getInstance());
+        getOutputs().getOutputs().add(out2);
+    }
+
+    /**
+     * Adds array of feeds as input.
+     */
+    public void addInputFeeds(String[] ipFeed) {
+        for(int i=0; i<ipFeed.length; i++){
+            addInputFeed(ipFeed[i], ipFeed[i]);
+        }
+    }
+
+    /**
+     * Adds array of feeds as output.
+     */
+    public void addOutputFeeds(String[] opFeed) {
+        for(int i=0; i<opFeed.length; i++){
+            addOutputFeed(opFeed[i], opFeed[i]);
+        }
+    }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
index 7b8d111..2baa35f 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/helpers/entity/AbstractEntityHelper.java
@@ -669,4 +669,21 @@ public abstract class AbstractEntityHelper {
                 entityName + colo);
         return Util.sendRequest(url, "post", data, user);
     }
+
+
+    /**
+     * Retrieves entities lineage.
+     * @param params list of optional parameters
+     * @return entity lineage for the given pipeline.
+     */
+    public ServiceResponse getEntityLineage(String params)
+        throws URISyntaxException, AuthenticationException, InterruptedException, IOException {
+
+        String url = createUrl(this.hostname + URLS.ENTITY_LINEAGE.getValue(), colo);
+        if (StringUtils.isNotEmpty(params)){
+            url += colo.isEmpty() ? "?" + params : "&" + params;
+        }
+        return Util.sendRequestLineage(createUrl(url), "get", null, null);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
index 175bb98..55e862c 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/ServiceResponse.java
@@ -18,11 +18,13 @@
 
 package org.apache.falcon.regression.core.response;
 
+import com.google.gson.GsonBuilder;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.resource.EntityList;
 import org.apache.falcon.resource.EntitySummaryResult;
+import org.apache.falcon.resource.LineageGraphResult;
 import org.apache.http.HttpResponse;
 import org.apache.log4j.Logger;
 
@@ -109,4 +111,14 @@ public class ServiceResponse {
             return null;
         }
     }
+
+    /**
+     * Retrieves LineageGraphResult from a message if possible.
+     * @return LineageGraphResult
+     */
+    public LineageGraphResult getLineageGraphResult() {
+        LineageGraphResult lineageGraphResult = new GsonBuilder().create().fromJson(message, LineageGraphResult.class);
+        return lineageGraphResult;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java
new file mode 100644
index 0000000..fc42cf5
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/EntityLineageUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.falcon.resource.LineageGraphResult;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+
+/**
+ *Util function related to entity lineage.
+ */
+public final class EntityLineageUtil{
+
+    private static final Logger LOGGER = Logger.getLogger(EntityLineageUtil.class);
+
+    private EntityLineageUtil() {
+        throw new AssertionError("Instantiating utility class...");
+    }
+
+    /**
+     * Validates entity lineage results.
+     * @param lineageGraphResult entity lineage result
+     * @param expectedVertices array of expected vertices
+     * @param expectedEdgeArray array of expected edges
+     */
+    public static void validateLineageGraphResult(LineageGraphResult lineageGraphResult, String[] expectedVertices,
+                                                  LineageGraphResult.Edge[] expectedEdgeArray) {
+        String[] actualVertices;
+        LineageGraphResult.Edge[] actualEdgeArray;
+        actualVertices = lineageGraphResult.getVertices();
+        actualEdgeArray = lineageGraphResult.getEdges();
+
+        Set<LineageGraphResult.Edge> expectedEdgeSet = new HashSet<>(Arrays.asList(expectedEdgeArray));
+        Set<LineageGraphResult.Edge> actualEdgeSet = new HashSet<>(Arrays.asList(actualEdgeArray));
+
+        Set<String> expectedVerticesSet = new HashSet<>(Arrays.asList(expectedVertices));
+        Set<String> actualVerticesSet = new HashSet<>(Arrays.asList(actualVertices));
+
+        Assert.assertEquals(expectedEdgeSet, actualEdgeSet, "Edges dont match");
+        Assert.assertEquals(expectedVerticesSet, actualVerticesSet, "Vertices dont match");
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
index c220215..6c8d4ee 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Util.java
@@ -382,6 +382,7 @@ public final class Util {
         STATUS_URL("/api/entities/status"),
         ENTITY_SUMMARY("/api/entities/summary"),
         SUBMIT_AND_SCHEDULE_URL("/api/entities/submitAndSchedule"),
+        ENTITY_LINEAGE("/api/metadata/lineage/entities"),
         INSTANCE_RUNNING("/api/instance/running"),
         INSTANCE_STATUS("/api/instance/status"),
         INSTANCE_KILL("/api/instance/kill"),
@@ -559,4 +560,24 @@ public final class Util {
             return className;
         }
     }
+
+    /**
+     * Sends api requests.
+     * @param url target url
+     * @param method request method
+     * @param data data to be places in body of request
+     * @param user user to be used to send request
+     * @return api response
+     * @throws IOException
+     * @throws URISyntaxException
+     * @throws AuthenticationException
+     */
+    public static ServiceResponse sendRequestLineage(String url, String method, String data, String user)
+        throws IOException, URISyntaxException, AuthenticationException, InterruptedException {
+        BaseRequest request = new BaseRequest(url, method, user, data);
+        request.addHeader(RequestKeys.CONTENT_TYPE_HEADER, RequestKeys.JSON_CONTENT_TYPE);
+        HttpResponse response = request.run();
+        return new ServiceResponse(response);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java
index b2e38b2..05d5fc6 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/request/RequestKeys.java
@@ -26,6 +26,7 @@ public final class RequestKeys {
 
     public static final String CONTENT_TYPE_HEADER = "Content-Type";
     public static final String XML_CONTENT_TYPE = "text/xml";
+    public static final String JSON_CONTENT_TYPE = "application/json";
 
     public static final String AUTH_COOKIE = "hadoop.auth";
     public static final String AUTH_COOKIE_EQ = AUTH_COOKIE + "=";

http://git-wip-us.apache.org/repos/asf/falcon/blob/63cf451a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java
new file mode 100644
index 0000000..52cb198
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntityLineageTest.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.lineage;
+
+import org.apache.falcon.Pair;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.regression.Entities.FeedMerlin;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.Generator;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.EntityLineageUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.falcon.resource.LineageGraphResult;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Test Suite for Entity lineage.
+ */
+@Test(groups = "embedded")
+public class EntityLineageTest extends BaseTestClass {
+
+    private String baseTestDir = cleanAndGetTestDir();
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private ColoHelper cluster = servers.get(0);
+    private static final Logger LOGGER = Logger.getLogger(EntityLineageTest.class);
+
+    private int numInputFeeds;
+    private String startTime = "2015-06-06T09:37Z";
+    private String endTime = "2015-06-06T09:45Z";
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        LOGGER.info("Time range between : " + startTime + " and " + endTime);
+        uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle(this);
+        bundles[0].submitClusters(prism);
+
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].setProcessPeriodicity(10, TimeUnit.minutes);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws IOException {
+        removeTestClassEntities();
+        cleanTestsDirs();
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void lineageTestFail() throws Exception {
+        String failedPipelineName = "pipeline2";
+        ServiceResponse response = prism.getProcessHelper().getEntityLineage("pipeline=" + failedPipelineName);
+        AssertUtil.assertFailed(response, "No processes belonging to pipeline " + failedPipelineName);
+    }
+
+    @Test(groups = {"singleCluster"})
+    public void lineageTestPass() throws Exception {
+
+        String pipelineName = "pipeline1";
+
+        FeedMerlin inputMerlin = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        inputMerlin.setValidity("2014-01-01T01:00Z", "2016-12-12T22:00Z");
+        inputMerlin.setFrequency(new Frequency("5", TimeUnit.minutes));
+        numInputFeeds = 10;
+
+        FeedMerlin[] feedMerlins = generateFeeds(numInputFeeds, inputMerlin,
+                                   Generator.getNameGenerator("Feed", ""),
+                                   Generator.getHadoopPathGenerator(baseTestDir, MINUTE_DATE_PATTERN));
+
+        Pair<String[], String[]>[] input = new Pair[]{
+            new Pair((new String[]{"Feed001"}), new String[]{"Feed002", "Feed005"}),
+            new Pair(new String[]{"Feed002"}, new String[]{"Feed003"}),
+            new Pair(new String[]{"Feed002"}, new String[]{"Feed004"}),
+            new Pair(new String[]{"Feed003", "Feed004", "Feed005"}, new String[]{"Feed001", "Feed006"}),
+            new Pair(new String[]{"Feed006"}, new String[]{"Feed007"}),
+            new Pair(new String[]{"Feed008", "Feed010"}, new String[]{"Feed009"}),
+            new Pair(new String[]{"Feed009"}, new String[]{"Feed010"}),
+        };
+
+        List<Pair<String[], String[]>> processFeed = Arrays.asList(input);
+        List<ProcessMerlin> processMerlins = new ArrayList<>();
+        Generator nameGenerator=Generator.getNameGenerator("Process", "");
+
+        for(Integer i=0; i<processFeed.size(); i++){
+            ProcessMerlin process = createProcess(nameGenerator.generate().replace("-", ""), pipelineName,
+                            processFeed.get(i).first, processFeed.get(i).second);
+            processMerlins.add(i, process);
+        }
+
+        deleteProcess(processMerlins);
+        deleteFeed(Arrays.asList(feedMerlins));
+
+        for (FeedMerlin feed : feedMerlins) {
+            AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(feed.toString()));
+        }
+
+        for(ProcessMerlin process : processMerlins) {
+            AssertUtil.assertSucceeded(prism.getProcessHelper().submitEntity(process.toString()));
+        }
+
+        LineageGraphResult lineageGraphResult = prism.getProcessHelper().getEntityLineage("pipeline=" + pipelineName)
+                .getLineageGraphResult();
+
+        LOGGER.info("LineageGraphResult : " + lineageGraphResult.toString());
+
+        validateLineage(lineageGraphResult);
+
+        deleteProcess(processMerlins);
+        deleteFeed(Arrays.asList(feedMerlins));
+    }
+
+    public ProcessMerlin createProcess(String processName, String pipelineName, String[] inputFeed,
+                                       String[] outputFeed) throws URISyntaxException, AuthenticationException,
+                                       InterruptedException, IOException, JAXBException {
+        ProcessMerlin processMerlin = new ProcessMerlin(bundles[0].getProcessObject().toString());
+        processMerlin.setName(processName);
+        processMerlin.setPipelineTag(pipelineName);
+        setProcessData(processMerlin, inputFeed, outputFeed);
+        return processMerlin;
+    }
+
+    private void setProcessData(ProcessMerlin processMerlin, String[] ipName, String[] opName) {
+        processMerlin.resetInputFeed(ipName[0], ipName[0]);
+        processMerlin.resetOutputFeed(opName[0], opName[0]);
+
+        if (ipName.length > 1) {
+            String[] ipFeed = new String[ipName.length -1];
+            System.arraycopy(ipName, 1, ipFeed, 0, ipName.length - 1);
+            processMerlin.addInputFeeds(ipFeed);
+        }
+        if (opName.length > 1) {
+            String[] opFeed = new String[opName.length - 1];
+            System.arraycopy(opName, 1, opFeed, 0, opName.length - 1);
+            processMerlin.addOutputFeeds(opFeed);
+        }
+    }
+
+    public static FeedMerlin[] generateFeeds(final int numInputFeeds,
+                                             final FeedMerlin originalFeedMerlin,
+                                             final Generator nameGenerator,
+                                             final Generator pathGenerator) {
+        FeedMerlin[] inputFeeds = new FeedMerlin[numInputFeeds];
+        //submit all input feeds
+        for(int count = 0; count < numInputFeeds; ++count) {
+            final FeedMerlin feed = new FeedMerlin(originalFeedMerlin.toString());
+            feed.setName(nameGenerator.generate().replace("-", ""));
+            feed.setLocation(LocationType.DATA, pathGenerator.generate());
+            inputFeeds[count] = feed;
+        }
+        return inputFeeds;
+    }
+
+    private void deleteProcess(List<ProcessMerlin> processMerlins) throws InterruptedException, IOException,
+            URISyntaxException, JAXBException, AuthenticationException {
+        for(ProcessMerlin process : processMerlins) {
+            AssertUtil.assertSucceeded(prism.getProcessHelper().delete(process.toString()));
+        }
+    }
+
+    private void deleteFeed(List<FeedMerlin> feedMerlins) throws InterruptedException, IOException,
+            URISyntaxException, JAXBException, AuthenticationException {
+        for(FeedMerlin feed : feedMerlins) {
+            AssertUtil.assertSucceeded(prism.getFeedHelper().delete(feed.toString()));
+        }
+    }
+
+    private void validateLineage(LineageGraphResult lineageGraphResult) {
+        String[] expectedVertices = {"Process001", "Process002", "Process003", "Process004", "Process005",
+                                     "Process006", "Process007", };
+        LineageGraphResult.Edge[] expectedEdgeArray = new LineageGraphResult.Edge[lineageGraphResult.getEdges().length];
+        expectedEdgeArray[0] = new LineageGraphResult.Edge("Process004", "Process005", "Feed006");
+        expectedEdgeArray[1] = new LineageGraphResult.Edge("Process006", "Process007", "Feed009");
+        expectedEdgeArray[2] = new LineageGraphResult.Edge("Process004", "Process001", "Feed001");
+        expectedEdgeArray[3] = new LineageGraphResult.Edge("Process002", "Process004", "Feed003");
+        expectedEdgeArray[4] = new LineageGraphResult.Edge("Process007", "Process006", "Feed010");
+        expectedEdgeArray[5] = new LineageGraphResult.Edge("Process001", "Process002", "Feed002");
+        expectedEdgeArray[6] = new LineageGraphResult.Edge("Process001", "Process003", "Feed002");
+        expectedEdgeArray[7] = new LineageGraphResult.Edge("Process001", "Process004", "Feed005");
+        expectedEdgeArray[8] = new LineageGraphResult.Edge("Process003", "Process004", "Feed004");
+
+        EntityLineageUtil.validateLineageGraphResult(lineageGraphResult, expectedVertices, expectedEdgeArray);
+    }
+}
+