You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/03/06 01:49:40 UTC

[4/6] FALCON-288 Persist lineage information into a persistent store. Contributed by Venkatesh Seetharam

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
new file mode 100644
index 0000000..7045636
--- /dev/null
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -0,0 +1,731 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.metadata;
+
+import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.GraphQuery;
+import com.tinkerpop.blueprints.KeyIndexableGraph;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfaces;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.feed.CatalogTable;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Input;
+import org.apache.falcon.entity.v0.process.Inputs;
+import org.apache.falcon.entity.v0.process.Output;
+import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.StartupProperties;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Test for Metadata relationship mapping service.
+ */
+public class MetadataMappingServiceTest {
+
+    public static final String FALCON_USER = "falcon-user";
+    private static final String LOGS_DIR = "target/log";
+    private static final String NOMINAL_TIME = "2014-01-01-01-00";
+    public static final String OPERATION = "GENERATE";
+
+    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String PROCESS_ENTITY_NAME = "sample-process";
+    public static final String COLO_NAME = "west-coast";
+    public static final String WORKFLOW_NAME = "imp-click-join-workflow";
+    public static final String WORKFLOW_VERSION = "1.0.9";
+
+    public static final String INPUT_FEED_NAMES = "impression-feed,clicks-feed";
+    public static final String INPUT_INSTANCE_PATHS =
+        "jail://global:00/falcon/impression-feed/20140101,jail://global:00/falcon/clicks-feed/20140101";
+
+    public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
+    public static final String OUTPUT_INSTANCE_PATHS =
+        "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+
+    private ConfigurationStore configStore;
+    private MetadataMappingService service;
+
+    private Cluster clusterEntity;
+    private Cluster bcpCluster;
+    private List<Feed> inputFeeds = new ArrayList<Feed>();
+    private List<Feed> outputFeeds = new ArrayList<Feed>();
+    private Process processEntity;
+
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        CurrentUser.authenticate(FALCON_USER);
+
+        configStore = ConfigurationStore.get();
+
+        StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
+        service = new MetadataMappingService();
+        service.init();
+
+        Set<String> vertexPropertyKeys = service.getVertexIndexedKeys();
+        System.out.println("Got vertex property keys: " + vertexPropertyKeys);
+
+        Set<String> edgePropertyKeys = service.getEdgeIndexedKeys();
+        System.out.println("Got edge property keys: " + edgePropertyKeys);
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        GraphUtils.dump(service.getGraph(), System.out);
+
+        cleanupGraphStore(service.getGraph());
+        cleanupConfigurationStore(configStore);
+        service.destroy();
+        StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
+    }
+
+    @AfterMethod
+    public void printGraph() throws Exception {
+        GraphUtils.dump(service.getGraph());
+    }
+
+    private GraphQuery getQuery() {
+        return service.getGraph().query();
+    }
+
+    @Test
+    public void testGetName() throws Exception {
+        Assert.assertEquals(service.getName(), MetadataMappingService.SERVICE_NAME);
+    }
+
+    @Test
+    public void testOnAddClusterEntity() throws Exception {
+        clusterEntity = buildCluster(CLUSTER_ENTITY_NAME, COLO_NAME, "classification=production");
+        configStore.publish(EntityType.CLUSTER, clusterEntity);
+
+        verifyEntityWasAddedToGraph(CLUSTER_ENTITY_NAME, EntityRelationshipGraphBuilder.CLUSTER_ENTITY_TYPE);
+        verifyClusterEntityEdges();
+
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 3); // +3 = cluster, colo, tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 2); // +2 = cluster to colo and tag
+    }
+
+    @Test (dependsOnMethods = "testOnAddClusterEntity")
+    public void testOnAddFeedEntity() throws Exception {
+        Feed impressionsFeed = buildFeed("impression-feed", clusterEntity, "classified-as=Secure", "analytics",
+                Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, impressionsFeed);
+        inputFeeds.add(impressionsFeed);
+        verifyEntityWasAddedToGraph(impressionsFeed.getName(), EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE);
+        verifyFeedEntityEdges(impressionsFeed.getName());
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
+
+        Feed clicksFeed = buildFeed("clicks-feed", clusterEntity, "classified-as=Secure,classified-as=Financial",
+                "analytics", Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, clicksFeed);
+        inputFeeds.add(clicksFeed);
+        verifyEntityWasAddedToGraph(clicksFeed.getName(),
+                EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 9); // feed and financial vertex
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 11); // +5 = cluster + user + 2Group + Tag
+
+        Feed join1Feed = buildFeed("imp-click-join1", clusterEntity, "classified-as=Financial", "reporting,bi",
+                Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, join1Feed);
+        outputFeeds.add(join1Feed);
+        verifyEntityWasAddedToGraph(join1Feed.getName(),
+                EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 12); // + 3 = 1 feed and 2 groups
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 16); // +5 = cluster + user + Group + 2Tags
+
+        Feed join2Feed = buildFeed("imp-click-join2", clusterEntity, "classified-as=Secure,classified-as=Financial",
+                "reporting,bi", Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, join2Feed);
+        outputFeeds.add(join2Feed);
+        verifyEntityWasAddedToGraph(join2Feed.getName(), EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE);
+
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 13); // +1 feed
+        // +6 = user + 2tags + 2Groups + Cluster
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 22);
+    }
+
+    @Test (dependsOnMethods = "testOnAddFeedEntity")
+    public void testOnAddProcessEntity() throws Exception {
+        processEntity = buildProcess(PROCESS_ENTITY_NAME, clusterEntity, "classified-as=Critical");
+        addWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+
+        for (Feed inputFeed : inputFeeds) {
+            addInput(processEntity, inputFeed);
+        }
+
+        for (Feed outputFeed : outputFeeds) {
+            addOutput(processEntity, outputFeed);
+        }
+
+        configStore.publish(EntityType.PROCESS, processEntity);
+
+        verifyEntityWasAddedToGraph(processEntity.getName(), EntityRelationshipGraphBuilder.PROCESS_ENTITY_TYPE);
+        verifyProcessEntityEdges();
+
+        // +2 = 1 process + 1 tag
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 15);
+        // +7 = user,tag,cluster, 2 inputs,2 outputs
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 29);
+    }
+
+    @Test (dependsOnMethods = "testOnAddProcessEntity")
+    public void testOnAdd() throws Exception {
+        verifyEntityGraph(EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE, "Secure");
+    }
+
+    @Test(dependsOnMethods = "testOnAdd")
+    public void testMapLineage() throws Exception {
+
+        LineageRecorder.main(getTestMessageArgs());
+
+        service.mapLineage(PROCESS_ENTITY_NAME, OPERATION, LOGS_DIR);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        verifyLineageGraph(InstanceRelationshipGraphBuilder.FEED_INSTANCE_TYPE);
+
+        // +6 = 1 process, 2 inputs,2 outputs
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 20);
+        //+32 = +26 for feed instances + 6 for process instance
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 61);
+    }
+
+    @Test (dependsOnMethods = "testMapLineage")
+    public void testOnChange() throws Exception {
+        // cannot modify cluster, adding a new cluster
+        bcpCluster = buildCluster("bcp-cluster", "east-coast", "classification=bcp");
+        configStore.publish(EntityType.CLUSTER, bcpCluster);
+        verifyEntityWasAddedToGraph("bcp-cluster", EntityRelationshipGraphBuilder.CLUSTER_ENTITY_TYPE);
+
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 23); // +3 = cluster, colo, tag
+        // +2 edges to above, no user but only to colo and new tag
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 63);
+    }
+
+    @Test(dependsOnMethods = "testOnChange")
+    public void testOnFeedEntityChange() throws Exception {
+        Feed oldFeed = inputFeeds.get(0);
+        Feed newFeed = buildFeed(oldFeed.getName(), clusterEntity,
+                "classified-as=Secured,source=data-warehouse", "reporting",
+                Storage.TYPE.FILESYSTEM, "jail://global:00/falcon/impression-feed/20140101");
+
+        try {
+            configStore.initiateUpdate(newFeed);
+
+            // add cluster
+            org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                    new org.apache.falcon.entity.v0.feed.Cluster();
+            feedCluster.setName(bcpCluster.getName());
+            newFeed.getClusters().getClusters().add(feedCluster);
+
+            configStore.update(EntityType.FEED, newFeed);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+
+        verifyUpdatedEdges(newFeed);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 25); //+2 = 2 new tags
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 65); // +2 = 1 new cluster, 1 new tag
+    }
+
+    private void verifyUpdatedEdges(Feed newFeed) {
+        Vertex feedVertex = getEntityVertex(newFeed.getName(),
+                EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE);
+
+        // groups
+        Edge edge = feedVertex.getEdges(Direction.OUT, RelationshipGraphBuilder.GROUPS_LABEL).iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "reporting");
+
+        // tags
+        edge = feedVertex.getEdges(Direction.OUT, "classified-as").iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "Secured");
+        edge = feedVertex.getEdges(Direction.OUT, "source").iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), "data-warehouse");
+
+        // new cluster
+        Iterator<Edge> clusterEdgeIterator = feedVertex.getEdges(Direction.OUT,
+                RelationshipGraphBuilder.FEED_CLUSTER_EDGE_LABEL).iterator();
+        edge = clusterEdgeIterator.next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), clusterEntity.getName());
+        edge = clusterEdgeIterator.next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), bcpCluster.getName());
+    }
+
+    @Test(dependsOnMethods = "testOnFeedEntityChange")
+    public void testOnProcessEntityChange() throws Exception {
+        Process oldProcess = processEntity;
+        Process newProcess = buildProcess(oldProcess.getName(), bcpCluster, null);
+        addWorkflow(newProcess, WORKFLOW_NAME, "2.0.0");
+        addInput(newProcess, inputFeeds.get(0));
+
+        try {
+            configStore.initiateUpdate(newProcess);
+            configStore.update(EntityType.PROCESS, newProcess);
+        } finally {
+            configStore.cleanupUpdateInit();
+        }
+
+        verifyUpdatedEdges(newProcess);
+        Assert.assertEquals(getVerticesCount(service.getGraph()), 25); // +0, no net new
+        Assert.assertEquals(getEdgesCount(service.getGraph()), 61); // -4 = -2 outputs, -1 tag, -1 cluster
+    }
+
+    private void verifyUpdatedEdges(Process newProcess) {
+        Vertex processVertex = getEntityVertex(newProcess.getName(),
+                EntityRelationshipGraphBuilder.PROCESS_ENTITY_TYPE);
+
+        // cluster
+        Edge edge = processVertex.getEdges(Direction.OUT,
+                RelationshipGraphBuilder.PROCESS_CLUSTER_EDGE_LABEL).iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("name"), bcpCluster.getName());
+
+/*
+        // workflow
+        edge = processVertex.getEdges(Direction.OUT,
+                RelationshipGraphBuilder.PROCESS_WORKFLOW_EDGE_LABEL).iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.IN).getProperty("version"),
+                newProcess.getWorkflow().getVersion());
+*/
+
+        // inputs
+        edge = processVertex.getEdges(Direction.IN,
+                RelationshipGraphBuilder.FEED_PROCESS_EDGE_LABEL).iterator().next();
+        Assert.assertEquals(edge.getVertex(Direction.OUT).getProperty("name"),
+                newProcess.getInputs().getInputs().get(0).getFeed());
+
+        // outputs
+        for (Edge e : processVertex.getEdges(Direction.OUT, RelationshipGraphBuilder.PROCESS_FEED_EDGE_LABEL)) {
+            Assert.fail("there should not be any edges to output feeds" + e);
+        }
+    }
+
+    public static void debug(final KeyIndexableGraph graph) {
+        System.out.println("*****Vertices of " + graph);
+        for (Vertex vertex : graph.getVertices()) {
+            System.out.println(GraphUtils.vertexString(vertex));
+        }
+
+        System.out.println("*****Edges of " + graph);
+        for (Edge edge : graph.getEdges()) {
+            System.out.println(GraphUtils.edgeString(edge));
+        }
+    }
+
+    private static Cluster buildCluster(String name, String colo, String tags) {
+        Cluster cluster = new Cluster();
+        cluster.setName(name);
+        cluster.setColo(colo);
+        cluster.setTags(tags);
+
+        Interfaces interfaces = new Interfaces();
+        cluster.setInterfaces(interfaces);
+
+        Interface storage = new Interface();
+        storage.setEndpoint("jail://global:00");
+        storage.setType(Interfacetype.WRITE);
+        cluster.getInterfaces().getInterfaces().add(storage);
+
+        return cluster;
+    }
+
+    private static Feed buildFeed(String feedName, Cluster cluster, String tags, String groups,
+                                  Storage.TYPE storageType, String uriTemplate) {
+        Feed feed = new Feed();
+        feed.setName(feedName);
+        feed.setTags(tags);
+        feed.setGroups(groups);
+        feed.setFrequency(Frequency.fromString("hours(1)"));
+
+        org.apache.falcon.entity.v0.feed.Clusters
+                clusters = new org.apache.falcon.entity.v0.feed.Clusters();
+        feed.setClusters(clusters);
+        org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+                new org.apache.falcon.entity.v0.feed.Cluster();
+        feedCluster.setName(cluster.getName());
+        clusters.getClusters().add(feedCluster);
+
+        addStorage(feed, storageType, uriTemplate);
+
+        return feed;
+    }
+
+    private static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            feed.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            feed.setTable(table);
+        }
+    }
+
+    private static Process buildProcess(String processName, Cluster cluster,
+                                        String tags) throws Exception {
+        Process processEntity = new Process();
+        processEntity.setName(processName);
+        processEntity.setTags(tags);
+
+        org.apache.falcon.entity.v0.process.Cluster processCluster =
+                new org.apache.falcon.entity.v0.process.Cluster();
+        processCluster.setName(cluster.getName());
+        processEntity.setClusters(new org.apache.falcon.entity.v0.process.Clusters());
+        processEntity.getClusters().getClusters().add(processCluster);
+
+        return processEntity;
+    }
+
+    private static void addWorkflow(Process process, String workflowName, String version) {
+        Workflow workflow = new Workflow();
+        workflow.setName(workflowName);
+        workflow.setVersion(version);
+        workflow.setEngine(EngineType.PIG);
+        workflow.setPath("/falcon/test/workflow");
+
+        process.setWorkflow(workflow);
+    }
+
+    private static void addInput(Process process, Feed feed) {
+        if (process.getInputs() == null) {
+            process.setInputs(new Inputs());
+        }
+
+        Inputs inputs = process.getInputs();
+        Input input = new Input();
+        input.setFeed(feed.getName());
+        inputs.getInputs().add(input);
+    }
+
+    private static void addOutput(Process process, Feed feed) {
+        if (process.getOutputs() == null) {
+            process.setOutputs(new Outputs());
+        }
+
+        Outputs outputs = process.getOutputs();
+        Output output = new Output();
+        output.setFeed(feed.getName());
+        outputs.getOutputs().add(output);
+    }
+
+    private void verifyEntityWasAddedToGraph(String entityName, String entityType) {
+        Vertex entityVertex = getEntityVertex(entityName, entityType);
+        Assert.assertNotNull(entityVertex);
+        verifyEntityProperties(entityVertex, entityName, entityType);
+    }
+
+    private void verifyEntityProperties(Vertex entityVertex, String entityName, String entityType) {
+        Assert.assertEquals(entityName, entityVertex.getProperty(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY));
+        Assert.assertEquals(entityType, entityVertex.getProperty(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY));
+        Assert.assertNotNull(entityVertex.getProperty(EntityRelationshipGraphBuilder.TIMESTAMP_PROPERTY_KEY));
+    }
+
+    private void verifyClusterEntityEdges() {
+        Vertex clusterVertex = getEntityVertex(CLUSTER_ENTITY_NAME,
+                EntityRelationshipGraphBuilder.CLUSTER_ENTITY_TYPE);
+
+        // verify edge to user vertex
+        verifyVertexForEdge(clusterVertex, Direction.OUT, EntityRelationshipGraphBuilder.USER_LABEL,
+                FALCON_USER, EntityRelationshipGraphBuilder.USER_TYPE);
+        // verify edge to colo vertex
+        verifyVertexForEdge(clusterVertex, Direction.OUT, EntityRelationshipGraphBuilder.CLUSTER_COLO_LABEL,
+                COLO_NAME, EntityRelationshipGraphBuilder.COLO_TYPE);
+        // verify edge to tags vertex
+        verifyVertexForEdge(clusterVertex, Direction.OUT, "classification",
+                "production", EntityRelationshipGraphBuilder.TAGS_TYPE);
+    }
+
+    private void verifyFeedEntityEdges(String feedName) {
+        Vertex feedVertex = getEntityVertex(feedName, EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE);
+
+        // verify edge to cluster vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, EntityRelationshipGraphBuilder.FEED_CLUSTER_EDGE_LABEL,
+                CLUSTER_ENTITY_NAME, EntityRelationshipGraphBuilder.CLUSTER_ENTITY_TYPE);
+        // verify edge to user vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, EntityRelationshipGraphBuilder.USER_LABEL,
+                FALCON_USER, EntityRelationshipGraphBuilder.USER_TYPE);
+        // verify edge to tags vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as",
+                "Secure", EntityRelationshipGraphBuilder.TAGS_TYPE);
+        // verify edge to group vertex
+        verifyVertexForEdge(feedVertex, Direction.OUT, EntityRelationshipGraphBuilder.GROUPS_LABEL,
+                "analytics", EntityRelationshipGraphBuilder.GROUPS_TYPE);
+    }
+
+    private void verifyProcessEntityEdges() {
+        Vertex processVertex = getEntityVertex(PROCESS_ENTITY_NAME,
+                EntityRelationshipGraphBuilder.PROCESS_ENTITY_TYPE);
+
+        // verify edge to cluster vertex
+        verifyVertexForEdge(processVertex, Direction.OUT, EntityRelationshipGraphBuilder.FEED_CLUSTER_EDGE_LABEL,
+                CLUSTER_ENTITY_NAME, EntityRelationshipGraphBuilder.CLUSTER_ENTITY_TYPE);
+        // verify edge to user vertex
+        verifyVertexForEdge(processVertex, Direction.OUT, EntityRelationshipGraphBuilder.USER_LABEL,
+                FALCON_USER, EntityRelationshipGraphBuilder.USER_TYPE);
+        // verify edge to tags vertex
+        verifyVertexForEdge(processVertex, Direction.OUT, "classified-as",
+                "Critical", EntityRelationshipGraphBuilder.TAGS_TYPE);
+
+        // verify edges to inputs
+        List<String> actual = new ArrayList<String>();
+        for (Edge edge : processVertex.getEdges(Direction.IN,
+                EntityRelationshipGraphBuilder.FEED_PROCESS_EDGE_LABEL)) {
+            Vertex outVertex = edge.getVertex(Direction.OUT);
+            Assert.assertEquals(EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE,
+                    outVertex.getProperty(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY));
+            actual.add(outVertex.<String>getProperty(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY));
+        }
+
+        Assert.assertTrue(actual.containsAll(Arrays.asList("impression-feed", "clicks-feed")),
+                "Actual does not contain expected: " + actual);
+
+        actual.clear();
+        // verify edges to outputs
+        for (Edge edge : processVertex.getEdges(Direction.OUT,
+                EntityRelationshipGraphBuilder.PROCESS_FEED_EDGE_LABEL)) {
+            Vertex outVertex = edge.getVertex(Direction.IN);
+            Assert.assertEquals(EntityRelationshipGraphBuilder.FEED_ENTITY_TYPE,
+                    outVertex.getProperty(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY));
+            actual.add(outVertex.<String>getProperty(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY));
+        }
+        Assert.assertTrue(actual.containsAll(Arrays.asList("imp-click-join1", "imp-click-join2")),
+                "Actual does not contain expected: " + actual);
+    }
+
+    private Vertex getEntityVertex(String entityName, String entityType) {
+        GraphQuery entityQuery = getQuery()
+                .has(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY, entityName)
+                .has(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY, entityType);
+        Iterator<Vertex> iterator = entityQuery.vertices().iterator();
+        Assert.assertTrue(iterator.hasNext());
+
+        Vertex entityVertex = iterator.next();
+        Assert.assertNotNull(entityVertex);
+
+        return entityVertex;
+    }
+
+    private void verifyVertexForEdge(Vertex fromVertex, Direction direction, String label,
+                                     String expectedName, String expectedType) {
+        for (Edge edge : fromVertex.getEdges(direction, label)) {
+            Vertex outVertex = edge.getVertex(Direction.IN);
+            Assert.assertEquals(
+                    outVertex.getProperty(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY), expectedName);
+            Assert.assertEquals(
+                    outVertex.getProperty(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY), expectedType);
+        }
+    }
+
+    private void verifyEntityGraph(String feedType, String classification) {
+        // feeds owned by a user
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType);
+        Assert.assertEquals(feedNamesOwnedByUser,
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1", "imp-click-join2"));
+
+        // feeds classified as secure
+        verifyFeedsClassifiedAsSecure(feedType,
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
+
+        // feeds owned by a user and classified as secure
+        verifyFeedsOwnedByUserAndClassification(feedType, classification,
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join2"));
+    }
+
+    private List<String> getFeedsOwnedByAUser(String feedType) {
+        GraphQuery userQuery = getQuery()
+                .has(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY, FALCON_USER)
+                .has(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY, EntityRelationshipGraphBuilder.USER_TYPE);
+
+        List<String> feedNames = new ArrayList<String>();
+        for (Vertex userVertex : userQuery.vertices()) {
+            for (Vertex feed : userVertex.getVertices(Direction.IN, EntityRelationshipGraphBuilder.USER_LABEL)) {
+                if (feed.getProperty(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY).equals(feedType)) {
+                    System.out.println(FALCON_USER + " owns -> " + GraphUtils.vertexString(feed));
+                    feedNames.add(feed.<String>getProperty(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY));
+                }
+            }
+        }
+
+        return feedNames;
+    }
+
+    private void verifyFeedsClassifiedAsSecure(String feedType, List<String> expected) {
+        GraphQuery classQuery = getQuery()
+                .has(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY, "Secure")
+                .has(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY, EntityRelationshipGraphBuilder.TAGS_TYPE);
+
+        List<String> actual = new ArrayList<String>();
+        for (Vertex feedVertex : classQuery.vertices()) {
+            for (Vertex feed : feedVertex.getVertices(Direction.BOTH, "classified-as")) {
+                if (feed.getProperty(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY).equals(feedType)) {
+                    System.out.println(" Secure classification -> " + GraphUtils.vertexString(feed));
+                    actual.add(feed.<String>getProperty(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY));
+                }
+            }
+        }
+
+        Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
+    }
+
+    private void verifyFeedsOwnedByUserAndClassification(String feedType, String classification,
+                                                         List<String> expected) {
+        List<String> actual = new ArrayList<String>();
+        Vertex userVertex = getEntityVertex(FALCON_USER, EntityRelationshipGraphBuilder.USER_TYPE);
+        for (Vertex feed : userVertex.getVertices(Direction.IN, EntityRelationshipGraphBuilder.USER_LABEL)) {
+            if (feed.getProperty(EntityRelationshipGraphBuilder.TYPE_PROPERTY_KEY).equals(feedType)) {
+                for (Vertex classVertex : feed.getVertices(Direction.OUT, "classified-as")) {
+                    if (classVertex.getProperty(EntityRelationshipGraphBuilder.NAME_PROPERTY_KEY)
+                            .equals(classification)) {
+                        actual.add(feed.<String>getProperty(RelationshipGraphBuilder.NAME_PROPERTY_KEY));
+                        System.out.println(classification + " feed owned by falcon-user -> "
+                                + GraphUtils.vertexString(feed));
+                    }
+                }
+            }
+        }
+        Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
+    }
+
+    public long getVerticesCount(final KeyIndexableGraph graph) {
+        long count = 0;
+        for (Vertex ignored : graph.getVertices()) {
+            count++;
+        }
+
+        return count;
+    }
+
+    public long getEdgesCount(final KeyIndexableGraph graph) {
+        long count = 0;
+        for (Edge ignored : graph.getEdges()) {
+            count++;
+        }
+
+        return count;
+    }
+
+    private void verifyLineageGraph(String feedType) {
+        // feeds owned by a user
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType);
+        List<String> expected = Arrays.asList("impression-feed/20140101", "clicks-feed/20140101",
+                "imp-click-join1/20140101", "imp-click-join2/20140101");
+        Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
+
+        KeyIndexableGraph graph = service.getGraph();
+
+        Iterator<Vertex> vertices = graph.getVertices("name", "impression-feed/20140101").iterator();
+        Assert.assertTrue(vertices.hasNext());
+        Vertex feedInstanceVertex = vertices.next();
+        Assert.assertEquals(
+                feedInstanceVertex.getProperty(RelationshipGraphBuilder.TYPE_PROPERTY_KEY),
+                InstanceRelationshipGraphBuilder.FEED_INSTANCE_TYPE);
+
+        Object vertexId = feedInstanceVertex.getId();
+        Vertex vertexById = graph.getVertex(vertexId);
+        Assert.assertEquals(vertexById, feedInstanceVertex);
+
+        // feeds classified as secure
+        verifyFeedsClassifiedAsSecure(feedType,
+                Arrays.asList("impression-feed/20140101", "clicks-feed/20140101", "imp-click-join2/20140101"));
+
+        // feeds owned by a user and classified as secure
+        verifyFeedsOwnedByUserAndClassification(feedType, "Financial",
+                Arrays.asList("clicks-feed/20140101", "imp-click-join1/20140101", "imp-click-join2/20140101"));
+    }
+
+    private static String[] getTestMessageArgs() {
+        return new String[]{
+            "-" + LineageArgs.NOMINAL_TIME.getOptionName(), NOMINAL_TIME,
+            "-" + LineageArgs.TIMESTAMP.getOptionName(), NOMINAL_TIME,
+
+            "-" + LineageArgs.ENTITY_NAME.getOptionName(), PROCESS_ENTITY_NAME,
+            "-" + LineageArgs.ENTITY_TYPE.getOptionName(), ("process"),
+            "-" + LineageArgs.CLUSTER.getOptionName(), CLUSTER_ENTITY_NAME,
+            "-" + LineageArgs.OPERATION.getOptionName(), OPERATION,
+
+            "-" + LineageArgs.INPUT_FEED_NAMES.getOptionName(), INPUT_FEED_NAMES,
+            "-" + LineageArgs.INPUT_FEED_PATHS.getOptionName(), INPUT_INSTANCE_PATHS,
+
+            "-" + LineageArgs.FEED_NAMES.getOptionName(), OUTPUT_FEED_NAMES,
+            "-" + LineageArgs.FEED_INSTANCE_PATHS.getOptionName(), OUTPUT_INSTANCE_PATHS,
+
+            "-" + LineageArgs.WORKFLOW_ID.getOptionName(), "workflow-01-00",
+            "-" + LineageArgs.WORKFLOW_USER.getOptionName(), FALCON_USER,
+            "-" + LineageArgs.RUN_ID.getOptionName(), "1",
+            "-" + LineageArgs.STATUS.getOptionName(), "SUCCEEDED",
+            "-" + LineageArgs.WF_ENGINE_URL.getOptionName(), "http://localhost:11000/oozie",
+            "-" + LineageArgs.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id",
+            "-" + LineageArgs.USER_WORKFLOW_NAME.getOptionName(), WORKFLOW_NAME,
+            "-" + LineageArgs.USER_WORKFLOW_VERSION.getOptionName(), WORKFLOW_VERSION,
+            "-" + LineageArgs.USER_WORKFLOW_ENGINE.getOptionName(), EngineType.PIG.name(),
+
+            "-" + LineageArgs.LOG_DIR.getOptionName(), LOGS_DIR,
+        };
+    }
+
+    private void cleanupGraphStore(KeyIndexableGraph graph) {
+        for (Edge edge : graph.getEdges()) {
+            graph.removeEdge(edge);
+        }
+
+        for (Vertex vertex : graph.getVertices()) {
+            graph.removeVertex(vertex);
+        }
+
+        graph.shutdown();
+    }
+
+    private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception {
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index 63c16ad..11dc1e4 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -147,7 +147,7 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
     }
 
     @Override
-    public void onAdd(Entity entity, boolean ignoreFailure) throws FalconException {
+    public void onAdd(Entity entity) throws FalconException {
         if (entity.getEntityType() != EntityType.CLUSTER) {
             return;
         }
@@ -157,14 +157,7 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
             return;
         }
 
-        try {
-            addLibsTo(cluster);
-        } catch(FalconException e) {
-            if (!ignoreFailure) {
-                throw e;
-            }
-            LOG.warn("Failed to copy shared libraries to cluster", e);
-        }
+        addLibsTo(cluster);
     }
 
     @Override
@@ -186,4 +179,13 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
             addLibsTo(newCluster);
         }
     }
+
+    @Override
+    public void onReload(Entity entity) throws FalconException {
+        try {
+            onAdd(entity);
+        } catch (FalconException ignored) {
+            LOG.warn("Failed to copy shared libraries to cluster", ignored);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 81c017e..9bb3264 100644
--- a/pom.xml
+++ b/pom.xml
@@ -970,6 +970,48 @@
                 <artifactId>xercesImpl</artifactId>
                 <version>2.10.0</version>
             </dependency>
+
+            <dependency>
+                <groupId>com.tinkerpop.blueprints</groupId>
+                <artifactId>blueprints-core</artifactId>
+                <version>2.4.0</version>
+            </dependency>
+
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-core-jre6</artifactId>
+                <version>0.4.2</version>
+                <exclusions>
+                    <!-- rexster does not work with servlet-api -->
+                    <exclusion>
+                        <groupId>com.tinkerpop.rexster</groupId>
+                        <artifactId>rexster-core</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.tinkerpop.rexster</groupId>
+                        <artifactId>rexster-server</artifactId>
+                    </exclusion>
+                    <!-- asm 4.0 does not work with jersey asm 3.1 -->
+                    <exclusion>
+                        <groupId>com.tinkerpop</groupId>
+                        <artifactId>frames</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>com.esotericsoftware.reflectasm</groupId>
+                        <artifactId>reflectasm</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.ow2.asm</groupId>
+                        <artifactId>asm</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+
+            <dependency>
+                <groupId>com.thinkaurelius.titan</groupId>
+                <artifactId>titan-berkeleyje-jre6</artifactId>
+                <version>0.4.2</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index c1d629d..a32d2ee 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -23,6 +23,7 @@ import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.messaging.EntityInstanceMessage.ARG;
+import org.apache.falcon.metadata.MetadataMappingService;
 import org.apache.falcon.rerun.event.RerunEvent.RerunType;
 import org.apache.falcon.rerun.handler.AbstractRerunHandler;
 import org.apache.falcon.rerun.handler.RerunHandlerFactory;
@@ -126,6 +127,7 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
                     SchemaHelper.formatDateUTC(startTime), duration);
 
                 notifySLAService(cluster, entityName, entityType, nominalTime, duration);
+                notifyMetadataMappingService(entityName, operation, mapMessage.getString(ARG.logDir.getArgName()));
             }
         } catch (JMSException e) {
             LOG.info("Error in onMessage for subscriber of topic: " + this.toString(), e);
@@ -150,6 +152,12 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
         return Services.get().getService(SLAMonitoringService.SERVICE_NAME);
     }
 
+    private void notifyMetadataMappingService(String entityName, String operation,
+                                              String logDir) throws FalconException {
+        MetadataMappingService service = Services.get().getService(MetadataMappingService.SERVICE_NAME);
+        service.mapLineage(entityName, operation, logDir);
+    }
+
     private void debug(MapMessage mapMessage) throws JMSException {
         StringBuilder buff = new StringBuilder();
         buff.append("Received:{");

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/src/bin/falcon-config.sh
----------------------------------------------------------------------
diff --git a/src/bin/falcon-config.sh b/src/bin/falcon-config.sh
index 47cc1e3..36be053 100644
--- a/src/bin/falcon-config.sh
+++ b/src/bin/falcon-config.sh
@@ -112,7 +112,7 @@ case $type in
     export FALCON_PID_DIR
     FALCON_PID_FILE=${FALCON_PID_DIR}/${app}.pid
     export FALCON_PID_FILE
-    FALCON_DATA_DIR=${FALCON_DATA_DIR:-${BASEDIR}/logs/data}
+    FALCON_DATA_DIR=${FALCON_DATA_DIR:-${BASEDIR}/data}
     FALCON_HOME_DIR="${FALCON_HOME_DIR:-$BASEDIR}"
     export FALCON_HOME_DIR
   ;;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/src/conf/startup.properties
----------------------------------------------------------------------
diff --git a/src/conf/startup.properties b/src/conf/startup.properties
index 0d0ab41..74f0d6b 100644
--- a/src/conf/startup.properties
+++ b/src/conf/startup.properties
@@ -36,9 +36,10 @@
                         org.apache.falcon.service.ProcessSubscriberService,\
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
-						org.apache.falcon.rerun.service.LateRunService,\
-						org.apache.falcon.service.SLAMonitoringService,\
-						org.apache.falcon.service.LogCleanupService
+                        org.apache.falcon.rerun.service.LateRunService,\
+                        org.apache.falcon.service.SLAMonitoringService,\
+                        org.apache.falcon.metadata.MetadataMappingService,\
+                        org.apache.falcon.service.LogCleanupService
 prism.application.services=org.apache.falcon.entity.store.ConfigurationStore
 *.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
                         org.apache.falcon.entity.ColoClusterRelation,\
@@ -56,7 +57,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
 ######### System startup parameters #########
 
 # Location to store user entity configurations
-*.config.store.uri=file://${falcon.home}/store
+*.config.store.uri=file://${falcon.home}/data/store
 
 # Location of libraries that is shipped to Hadoop
 *.system.lib.location=${falcon.home}/server/webapp/falcon/WEB-INF/lib
@@ -80,6 +81,17 @@ prism.system.lib.location=${falcon.home}/server/webapp/prism/WEB-INF/lib
 *.internal.queue.size=1000
 
 
+######### Graph Database Properties #########
+# Graph implementation
+*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
+
+# Graph Storage
+*.falcon.graph.storage.directory=/${falcon.home}/data/graphdb
+*.falcon.graph.storage.backend=berkeleyje
+*.falcon.graph.serialize.path=/${falcon.home}/data
+*.falcon.graph.preserve.history=false
+
+
 ######### Authentication Properties #########
 
 # Authentication type must be specified: simple|kerberos

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 0c9e601..5c293e1 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -171,6 +171,11 @@
         </dependency>
 
         <dependency>
+            <groupId>javax.servlet.jsp</groupId>
+            <artifactId>jsp-api</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.mortbay.jetty</groupId>
             <artifactId>jetty</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
index d503735..f946202 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.cli;
 
 import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.util.OozieTestUtils;
+import org.apache.falcon.util.StartupProperties;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -36,6 +37,10 @@ public class FalconCLISmokeIT {
     @BeforeClass
     public void prepare() throws Exception {
         TestContext.prepare();
+
+        String services = StartupProperties.get().getProperty("application.services");
+        StartupProperties.get().setProperty("application.services",
+                services + ",org.apache.falcon.metadata.MetadataMappingService");
     }
 
     @Test
@@ -47,18 +52,20 @@ public class FalconCLISmokeIT {
 
         filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
         Assert.assertEquals(-1,
-                executeWithURL("entity -submitAndSchedule -type cluster -file "
-                        + filePath));
+                executeWithURL("entity -submitAndSchedule -type cluster -file " + filePath));
         context.setCluster(overlay.get("cluster"));
 
+        // this is necessary for lineage
+        Assert.assertEquals(0, executeWithURL("entity -submit -type cluster -file " + filePath));
+
         filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
-                executeWithURL("entity -submitAndSchedule -type feed -file "
-                        + filePath));
+                executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
         filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
         Assert.assertEquals(0,
-                executeWithURL("entity -submitAndSchedule -type feed -file "
-                        + filePath));
+                executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
         filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
         Assert.assertEquals(0,
                 executeWithURL("entity -submit -type feed -file " + filePath));
@@ -69,8 +76,7 @@ public class FalconCLISmokeIT {
 
         filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(0,
-                executeWithURL("entity -validate -type process -file "
-                        + filePath));
+                executeWithURL("entity -validate -type process -file " + filePath));
 
         filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
         Assert.assertEquals(0,
@@ -80,17 +86,14 @@ public class FalconCLISmokeIT {
         OozieTestUtils.waitForProcessWFtoStart(context);
 
         Assert.assertEquals(0,
-                executeWithURL("entity -definition -type cluster -name "
-                        + overlay.get("cluster")));
+                executeWithURL("entity -definition -type cluster -name " + overlay.get("cluster")));
 
         Assert.assertEquals(0,
                 executeWithURL("instance -status -type feed -name "
-                        + overlay.get("outputFeedName")
-                        + " -start " + START_INSTANCE));
+                        + overlay.get("outputFeedName") + " -start " + START_INSTANCE));
 
         Assert.assertEquals(0,
-                executeWithURL("instance -running -type process -name "
-                        + overlay.get("processName")));
+                executeWithURL("instance -running -type process -name " + overlay.get("processName")));
     }
 
     private int executeWithURL(String command) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
index 082f541..dceb2f2 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
@@ -63,6 +63,7 @@ public class EntityManagerJerseySmokeIT {
         contexts.remove();
     }
 
+    @Test (dependsOnMethods = "testFeedSchedule")
     public void testProcessDeleteAndSchedule() throws Exception {
         //Submit process with invalid property so that coord submit fails and bundle goes to failed state
         TestContext context = newContext();
@@ -101,6 +102,7 @@ public class EntityManagerJerseySmokeIT {
         Assert.assertEquals(bundles.size(), 2);
     }
 
+    @Test
     public void testFeedSchedule() throws Exception {
         TestContext context = newContext();
         ClientResponse response;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/ecb919c6/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index b7a2256..a1e088d 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -342,6 +342,11 @@ public class TestContext {
         // setup a logged in user
         CurrentUser.authenticate(REMOTE_USER);
 
+        // disable recording lineage metadata if enabled
+        String services = StartupProperties.get().getProperty("application.services");
+        StartupProperties.get().setProperty("application.services",
+                services.replace("org.apache.falcon.metadata.MetadataMappingService", ""));
+
         Map<String, String> overlay = new HashMap<String, String>();
         overlay.put("cluster", RandomStringUtils.randomAlphabetic(5));
         overlay.put("colo", "gs");