You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/12 22:29:32 UTC

[4/5] git commit: FALCON-661 Add list types to Lineage API. Contributed by Balu Vellanki

FALCON-661 Add list types to Lineage API. Contributed by Balu Vellanki


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/7f98570b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/7f98570b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/7f98570b

Branch: refs/heads/master
Commit: 7f98570bb736dc7307d385271300568f185d3ffe
Parents: 42a1b76
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Sep 12 13:27:34 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Sep 12 13:27:34 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../java/org/apache/falcon/cli/FalconCLI.java   |   6 +
 .../apache/falcon/cli/FalconMetadataCLI.java    |  99 +++++++
 .../org/apache/falcon/client/FalconClient.java  |  43 +++
 .../falcon/metadata/RelationshipType.java       |  64 +++++
 .../falcon/metadata/RelationshipType.java       |  64 -----
 common/src/main/resources/startup.properties    |   1 +
 docs/src/site/twiki/FalconCLI.twiki             |  16 ++
 docs/src/site/twiki/restapi/MetadataList.twiki  |  30 ++
 docs/src/site/twiki/restapi/ResourceList.twiki  |   6 +
 .../metadata/AbstractMetadataResource.java      |  57 ++++
 .../metadata/LineageMetadataResource.java       |  33 +--
 .../metadata/MetadataDiscoveryResource.java     | 159 +++++++++++
 .../metadata/LineageMetadataResourceTest.java   | 277 +++----------------
 .../metadata/MetadataDiscoveryResourceTest.java | 152 ++++++++++
 .../resource/metadata/MetadataTestContext.java  | 231 ++++++++++++++++
 .../java/org/apache/falcon/cli/FalconCLIIT.java |  37 +++
 .../org/apache/falcon/process/PigProcessIT.java |   2 +-
 .../falcon/process/TableStorageProcessIT.java   |   2 +-
 .../resource/MetadataResourceJerseyIT.java      | 104 +++++++
 .../org/apache/falcon/resource/TestContext.java |  14 +-
 21 files changed, 1066 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5764c47..f1132d9 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,9 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-661 Add list types to Lineage API (Balu Vellanki via
+   Venkatesh Seetharam)
+
    FALCON-654 Exclude junit dependency in pom (Ruslan Ostafiychuk)
 
    FALCON-640 Add ability to specify sort order for orderBy param in RestAPI

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index fc2236b..f7229ec 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -59,6 +59,7 @@ public class FalconCLI {
     public static final String STATUS_OPTION = "status";
     public static final String ADMIN_CMD = "admin";
     public static final String HELP_CMD = "help";
+    public static final String METADATA_CMD = "metadata";
     private static final String VERSION_CMD = "version";
     private static final String STACK_OPTION = "stack";
 
@@ -162,6 +163,7 @@ public class FalconCLI {
     public synchronized int run(final String[] args) {
 
         CLIParser parser = new CLIParser("falcon", FALCON_HELP);
+        FalconMetadataCLI metadataCLI = new FalconMetadataCLI();
 
         parser.addCommand(ADMIN_CMD, "", "admin operations", createAdminOptions(), true);
         parser.addCommand(HELP_CMD, "", "display usage", new Options(), false);
@@ -173,6 +175,8 @@ public class FalconCLI {
                 "Process instances operations like running, status, kill, suspend, resume, rerun, logs",
                 instanceOptions(), false);
         parser.addCommand(GRAPH_CMD, "", "graph operations", createGraphOptions(), true);
+        parser.addCommand(METADATA_CMD, "", "Metadata operations like list, relations",
+                metadataCLI.createMetadataOptions(), true);
         parser.addCommand(RECIPE_CMD, "", "recipe operations", createRecipeOptions(), true);
 
         try {
@@ -193,6 +197,8 @@ public class FalconCLI {
                     instanceCommand(commandLine, client);
                 } else if (command.getName().equals(GRAPH_CMD)) {
                     graphCommand(commandLine, client);
+                } else if (command.getName().equals(METADATA_CMD)) {
+                    metadataCLI.metadataCommand(commandLine, client);
                 } else if (command.getName().equals(RECIPE_CMD)) {
                     recipeCommand(commandLine, client);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
new file mode 100644
index 0000000..d3956b0
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/cli/FalconMetadataCLI.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.client.FalconCLIException;
+import org.apache.falcon.client.FalconClient;
+import org.apache.falcon.metadata.RelationshipType;
+
+import java.io.PrintStream;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Metadata extension to Falcon Command Line Interface - wraps the RESTful API for Metadata.
+ */
+public class FalconMetadataCLI {
+
+    public static final AtomicReference<PrintStream> OUT = new AtomicReference<PrintStream>(System.out);
+
+    public static final String LIST_OPT = "list";
+
+    public static final String URL_OPTION = "url";
+    public static final String TYPE_OPT = "type";
+    public static final String CLUSTER_OPT = "cluster";
+
+    public FalconMetadataCLI() {}
+
+    public void metadataCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
+        Set<String> optionsList = new HashSet<String>();
+        for (Option option : commandLine.getOptions()) {
+            optionsList.add(option.getOpt());
+        }
+
+        String result = null;
+        String dimensionType = commandLine.getOptionValue(TYPE_OPT);
+        String cluster = commandLine.getOptionValue(CLUSTER_OPT);
+
+        validateDimensionType(dimensionType.toUpperCase());
+
+        if (optionsList.contains(LIST_OPT)) {
+            result = client.getDimensionList(dimensionType, cluster);
+        }
+
+        OUT.get().println(result);
+    }
+
+    private void validateDimensionType(String dimensionType) throws FalconCLIException {
+        if (StringUtils.isEmpty(dimensionType)
+                ||  dimensionType.contains("INSTANCE")) {
+            throw new FalconCLIException("Invalid value provided for queryParam \"type\" " + dimensionType);
+        }
+        try {
+            RelationshipType.valueOf(dimensionType);
+        } catch (IllegalArgumentException iae) {
+            throw new FalconCLIException("Invalid value provided for queryParam \"type\" " + dimensionType);
+        }
+    }
+
+    public Options createMetadataOptions() {
+        Options metadataOptions = new Options();
+
+        OptionGroup group = new OptionGroup();
+        Option list = new Option(LIST_OPT, false, "List all dimensions");
+        group.addOption(list);
+
+        Option url = new Option(URL_OPTION, true, "Falcon URL");
+        Option type = new Option(TYPE_OPT, true, "Dimension type");
+        Option cluster = new Option(CLUSTER_OPT, true, "Cluster name");
+
+        metadataOptions.addOptionGroup(group);
+        metadataOptions.addOption(url);
+        metadataOptions.addOption(type);
+        metadataOptions.addOption(cluster);
+
+        return metadataOptions;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index a63a76b..205ff31 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -27,6 +27,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.net.util.TrustManagerUtils;
 import org.apache.falcon.LifeCycle;
+import org.apache.falcon.cli.FalconMetadataCLI;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.SchemaHelper;
 import org.apache.falcon.recipe.RecipeTool;
@@ -203,6 +204,24 @@ public class FalconClient {
     }
 
     /**
+     * Methods allowed on Metadata Resources.
+     */
+    protected static enum MetadataOperations {
+
+        LIST("api/metadata/", HttpMethod.GET, MediaType.APPLICATION_JSON);
+
+        private String path;
+        private String method;
+        private String mimeType;
+
+        MetadataOperations(String path, String method, String mimeType) {
+            this.path = path;
+            this.method = method;
+            this.mimeType = mimeType;
+        }
+    }
+
+    /**
      * Methods allowed on Process Instance Resources.
      */
     protected static enum Instances {
@@ -484,6 +503,10 @@ public class FalconClient {
         return clientResponse.getStatus();
     }
 
+    public String getDimensionList(String dimensionType, String cluster) throws FalconCLIException {
+        return sendMetadataListRequest(MetadataOperations.LIST, dimensionType, cluster);
+    }
+
     /**
      * Converts a InputStream into ServletInputStream.
      *
@@ -765,6 +788,26 @@ public class FalconClient {
         return parseStringResult(clientResponse);
     }
 
+    private String sendMetadataListRequest(final MetadataOperations op, final String dimensionType,
+                                           final String cluster) throws FalconCLIException {
+        WebResource resource = service
+                .path(op.path)
+                .path(dimensionType)
+                .path(FalconMetadataCLI.LIST_OPT);
+
+        if (!StringUtils.isEmpty(cluster)) {
+            resource = resource.queryParam(FalconMetadataCLI.CLUSTER_OPT, cluster);
+        }
+
+        ClientResponse clientResponse = resource
+                .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+                .accept(op.mimeType).type(op.mimeType)
+                .method(op.method, ClientResponse.class);
+
+        checkIfSuccessful(clientResponse);
+        return parseStringResult(clientResponse);
+    }
+
     private String parseAPIResult(ClientResponse clientResponse)
         throws FalconCLIException {
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
new file mode 100644
index 0000000..f034772
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/metadata/RelationshipType.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.metadata;
+
+/**
+ * Enumerates Relationship types.
+ */
+public enum RelationshipType {
+
+    // entity vertex types
+    CLUSTER_ENTITY("cluster-entity"),
+    FEED_ENTITY("feed-entity"),
+    PROCESS_ENTITY("process-entity"),
+
+    // instance vertex types
+    FEED_INSTANCE("feed-instance"),
+    PROCESS_INSTANCE("process-instance"),
+
+    // Misc vertex types
+    USER("user"),
+    COLO("data-center"),
+    TAGS("classification"),
+    GROUPS("group"),
+    PIPELINES("pipelines");
+
+
+    private final String name;
+
+    RelationshipType(java.lang.String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public static RelationshipType fromString(String value) {
+        if (value != null) {
+            for (RelationshipType type : RelationshipType.values()) {
+                if (value.equals(type.getName())) {
+                    return type;
+                }
+            }
+        }
+
+        throw new IllegalArgumentException("No constant with value " + value + " found");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java b/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
deleted file mode 100644
index f034772..0000000
--- a/common/src/main/java/org/apache/falcon/metadata/RelationshipType.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.metadata;
-
-/**
- * Enumerates Relationship types.
- */
-public enum RelationshipType {
-
-    // entity vertex types
-    CLUSTER_ENTITY("cluster-entity"),
-    FEED_ENTITY("feed-entity"),
-    PROCESS_ENTITY("process-entity"),
-
-    // instance vertex types
-    FEED_INSTANCE("feed-instance"),
-    PROCESS_INSTANCE("process-instance"),
-
-    // Misc vertex types
-    USER("user"),
-    COLO("data-center"),
-    TAGS("classification"),
-    GROUPS("group"),
-    PIPELINES("pipelines");
-
-
-    private final String name;
-
-    RelationshipType(java.lang.String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public static RelationshipType fromString(String value) {
-        if (value != null) {
-            for (RelationshipType type : RelationshipType.values()) {
-                if (value.equals(type.getName())) {
-                    return type;
-                }
-            }
-        }
-
-        throw new IllegalArgumentException("No constant with value " + value + " found");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
index 314146c..e233b2a 100644
--- a/common/src/main/resources/startup.properties
+++ b/common/src/main/resources/startup.properties
@@ -36,6 +36,7 @@
                         org.apache.falcon.entity.store.ConfigurationStore,\
                         org.apache.falcon.rerun.service.RetryService,\
                         org.apache.falcon.rerun.service.LateRunService,\
+                        org.apache.falcon.metadata.MetadataMappingService,\
                         org.apache.falcon.service.LogCleanupService
 
 ##### Falcon Configuration Store Change listeners #####

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/docs/src/site/twiki/FalconCLI.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/FalconCLI.twiki b/docs/src/site/twiki/FalconCLI.twiki
index afa8136..d4ce366 100644
--- a/docs/src/site/twiki/FalconCLI.twiki
+++ b/docs/src/site/twiki/FalconCLI.twiki
@@ -301,6 +301,22 @@ Status returns the current state of Falcon (running or stopped).
 Usage:
 $FALCON_HOME/bin/falcon admin -status
 
+---++ Metadata Options
+
+---+++ List
+
+List all dimensions of type.
+
+Usage:
+$FALCON_HOME/bin/falcon metadata -list -type [cluster_entity|feed_entity|process_entity|user|colo|tags|groups|pipelines]
+
+Optional Args : -cluster <<cluster name>>
+
+Lists of all dimensions of given type. If the user provides optional param cluster, only the dimensions related to the cluster are listed.
+Example:
+$FALCON_HOME/bin/falcon metadata -list -type process_entity -cluster primary-cluster
+$FALCON_HOME/bin/falcon metadata -list -type tags
+
 ---++ Recipe Options
 
 ---+++ Submit Recipe

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/docs/src/site/twiki/restapi/MetadataList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/MetadataList.twiki b/docs/src/site/twiki/restapi/MetadataList.twiki
new file mode 100644
index 0000000..b6da203
--- /dev/null
+++ b/docs/src/site/twiki/restapi/MetadataList.twiki
@@ -0,0 +1,30 @@
+---++  GET api/metadata/:type/list
+   * <a href="#Description">Description</a>
+   * <a href="#Parameters">Parameters</a>
+   * <a href="#Results">Results</a>
+   * <a href="#Examples">Examples</a>
+
+---++ Description
+Get all dimensions of specified type.
+
+---++ Parameters
+   * :type Valid dimension types are cluster_entity,feed_entity, process_entity, user, colo, tags, groups, pipelines
+   * cluster <optional query param> Show dimensions related to this cluster.
+
+
+---++ Results
+List of dimensions that match requested type [and cluster].
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/metadata/process_entity/list?cluster=primary-cluster
+</verbatim>
+---+++ Result
+<verbatim>
+{
+    "results": ["sampleIngestProcess","testProcess","anotherProcess"],
+    "totalSize": 3
+}
+</verbatim>
+

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index d9cb3cb..34a07d5 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -5,6 +5,7 @@
    * <a href="#REST_Call_on_Feed_and_Process_Instances">REST Call on Feed/Process Instances</a>
    * <a href="#REST_Call_on_Admin_Resource">REST Call on Admin Resource</a>
    * <a href="#REST_Call_on_Lineage_Graph">REST Call on Lineage Graph Resource</a>
+   * <a href="#REST_Call_on_Metadata_Resource">REST Call on Metadata Resource</a>
 
 ---++ Authentication
 
@@ -76,3 +77,8 @@ See also: [[../Security.twiki][Security in Falcon]]
 | GET         | [[AdjacentVertices][api/graphs/lineage/vertices/:id/:direction]]                     | get the adjacent vertices or edges of the vertex with the specified direction |
 | GET         | [[AllEdges][api/graphs/lineage//edges/all]]                                          | get all edges                                                                 |
 | GET         | [[Edge][api/graphs/lineage/edges/:id]]                                               | get the edge with the specified id                                            |
+
+---++ REST Call on Metadata Resource
+
+| *Call Type* | *Resource*                                                                           | *Description*                                                                 |
+| GET         | [[MetadataList][api/metadata/:dimension-type/list]]                                  | list of dimensions  |
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java
new file mode 100644
index 0000000..a6c9b78
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/AbstractMetadataResource.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import com.tinkerpop.blueprints.Graph;
+import org.apache.falcon.metadata.MetadataMappingService;
+import org.apache.falcon.service.FalconService;
+import org.apache.falcon.service.Services;
+
+import java.util.Set;
+
+/**
+ * A base class for managing Metadata operations.
+]*/
+public abstract class AbstractMetadataResource {
+    public static final String RESULTS = "results";
+    public static final String TOTAL_SIZE = "totalSize";
+
+    protected MetadataMappingService service;
+
+    public AbstractMetadataResource() {
+        service = (MetadataMappingService)getService(MetadataMappingService.SERVICE_NAME);
+    }
+
+    public static FalconService getService(String serviceName) {
+        return (Services.get().isRegistered(serviceName))
+                ? Services.get().getService(serviceName) : null;
+    }
+
+    protected Graph getGraph() {
+        return service.getGraph();
+    }
+
+    protected Set<String> getVertexIndexedKeys() {
+        return service.getVertexIndexedKeys();
+    }
+
+    protected Set<String> getEdgeIndexedKeys() {
+        return service.getEdgeIndexedKeys();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
index cf6b6b1..2181bea 100644
--- a/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/LineageMetadataResource.java
@@ -21,24 +21,21 @@ package org.apache.falcon.resource.metadata;
 import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Element;
-import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
 import com.tinkerpop.blueprints.VertexQuery;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONMode;
 import com.tinkerpop.blueprints.util.io.graphson.GraphSONUtility;
+import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.metadata.GraphUtils;
-import org.apache.falcon.metadata.MetadataMappingService;
 import org.apache.falcon.metadata.RelationshipLabel;
 import org.apache.falcon.metadata.RelationshipProperty;
 import org.apache.falcon.metadata.RelationshipType;
-import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONException;
 import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
 
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -51,7 +48,6 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Jersey Resource for lineage metadata operations.
@@ -59,35 +55,10 @@ import java.util.Set;
  * https://github.com/tinkerpop/rexster/wiki/Basic-REST-API
  */
 @Path("graphs/lineage")
-public class LineageMetadataResource {
+public class LineageMetadataResource extends AbstractMetadataResource {
 
     private static final Logger LOG = LoggerFactory.getLogger(LineageMetadataResource.class);
 
-    public static final String RESULTS = "results";
-    public static final String TOTAL_SIZE = "totalSize";
-
-    private final MetadataMappingService service;
-
-    public LineageMetadataResource() {
-        if (Services.get().isRegistered(MetadataMappingService.SERVICE_NAME)) {
-            service = Services.get().getService(MetadataMappingService.SERVICE_NAME);
-        } else {
-            service = null;
-        }
-    }
-
-    private Graph getGraph() {
-        return service.getGraph();
-    }
-
-    private Set<String> getVertexIndexedKeys() {
-        return service.getVertexIndexedKeys();
-    }
-
-    private Set<String> getEdgeIndexedKeys() {
-        return service.getEdgeIndexedKeys();
-    }
-
     /**
      * Dump the graph.
      *

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java
new file mode 100644
index 0000000..d15d7cc
--- /dev/null
+++ b/prism/src/main/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResource.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import com.tinkerpop.blueprints.Direction;
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.GraphQuery;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.metadata.RelationshipLabel;
+import org.apache.falcon.metadata.RelationshipProperty;
+import org.apache.falcon.metadata.RelationshipType;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.Iterator;
+
+/**
+ * Jersey Resource for metadata operations.
+ */
+@Path("metadata")
+public class MetadataDiscoveryResource extends AbstractMetadataResource {
+
+    /**
+     * Get list of dimensions for the given dimension-type.
+     * <p/>
+     * GET http://host/metadata/dimension-type/list
+     */
+    @GET
+    @Path("/{type}/list")
+    @Produces({MediaType.APPLICATION_JSON})
+    public Response listDimensionValues(@PathParam("type") String type,
+                                        @QueryParam("cluster") final String clusterName) {
+        RelationshipType relationshipType = validateAndParseDimensionType(type.toUpperCase());
+        GraphQuery query = getGraph().query();
+        JSONArray dimensionValues = new JSONArray();
+
+        if (StringUtils.isEmpty(clusterName)) {
+            query = query.has(RelationshipProperty.TYPE.getName(), relationshipType.getName());
+            dimensionValues = getDimensionsFromVertices(dimensionValues,
+                    query.vertices().iterator());
+        } else {
+            // Get clusterVertex, get adjacent vertices of clusterVertex that match dimension type.
+            query = query
+                    .has(RelationshipProperty.TYPE.getName(), RelationshipType.CLUSTER_ENTITY.getName())
+                    .has(RelationshipProperty.NAME.getName(), clusterName);
+            Iterator<Vertex> clusterIterator = query.vertices().iterator();
+            if (clusterIterator.hasNext()) {
+                dimensionValues = getDimensionsFromClusterVertex(
+                        dimensionValues, clusterIterator.next(), relationshipType);
+            } // else, no cluster found. Return empty results
+        }
+
+        try {
+            JSONObject response = new JSONObject();
+            response.put(RESULTS, dimensionValues);
+            response.put(TOTAL_SIZE, dimensionValues.length());
+            return Response.ok(response).build();
+        } catch (JSONException e) {
+            throw new WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                    .entity(JSONObject.quote("An error occurred: " + e.getMessage())).build());
+        }
+    }
+
+    private JSONArray getDimensionsFromClusterVertex(JSONArray dimensionValues, Vertex clusterVertex,
+                                                     RelationshipType relationshipType) {
+        switch (relationshipType) {
+        case FEED_ENTITY:
+            return getDimensionsFromEdges(dimensionValues, Direction.OUT,
+                    clusterVertex.getEdges(
+                            Direction.IN, RelationshipLabel.FEED_CLUSTER_EDGE.getName()).iterator(),
+                    relationshipType);
+
+        case PROCESS_ENTITY:
+            return getDimensionsFromEdges(dimensionValues, Direction.OUT,
+                    clusterVertex.getEdges(
+                            Direction.IN, RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator(),
+                    relationshipType);
+
+        case COLO:
+            return getDimensionsFromEdges(dimensionValues, Direction.IN,
+                    clusterVertex.getEdges(
+                            Direction.OUT, RelationshipLabel.CLUSTER_COLO.getName()).iterator(),
+                    relationshipType);
+
+        case CLUSTER_ENTITY:
+            return getDimensionFromVertex(dimensionValues, clusterVertex);
+
+        default:
+            return dimensionValues;
+        }
+    }
+
+    private JSONArray getDimensionsFromVertices(JSONArray dimensionValues,
+                                                Iterator<Vertex> vertexIterator) {
+        while (vertexIterator.hasNext()) {
+            dimensionValues = getDimensionFromVertex(dimensionValues, vertexIterator.next());
+        }
+
+        return dimensionValues;
+    }
+
+    private JSONArray getDimensionsFromEdges(JSONArray dimensionValues, Direction direction,
+                                             Iterator<Edge> edgeIterator,
+                                             RelationshipType relationshipType) {
+        while(edgeIterator.hasNext()) {
+            Vertex vertex = edgeIterator.next().getVertex(direction);
+            if (vertex.getProperty(RelationshipProperty.TYPE.getName())
+                    .equals(relationshipType.getName())) {
+                dimensionValues = getDimensionFromVertex(dimensionValues, vertex);
+            }
+        }
+
+        return dimensionValues;
+    }
+
+    private JSONArray getDimensionFromVertex(JSONArray dimensionValues, Vertex vertex) {
+        return dimensionValues.put(vertex.getProperty(RelationshipProperty.NAME.getName()));
+    }
+
+    private RelationshipType validateAndParseDimensionType(String type) {
+        if (StringUtils.isEmpty(type) || type.contains("INSTANCE")) {
+            throw new WebApplicationException(Response.status(Response.Status.BAD_REQUEST)
+                    .entity("Invalid Dimension type : " +  type).type("text/plain").build());
+        }
+
+        try {
+            return RelationshipType.valueOf(type);
+        } catch (IllegalArgumentException iae) {
+            throw new WebApplicationException(Response.status(Response.Status.BAD_REQUEST)
+                    .entity("Invalid Dimension type : " +  type).type("text/plain").build());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
index 9bef7f5..0a8efc9 100644
--- a/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/LineageMetadataResourceTest.java
@@ -22,26 +22,10 @@ import com.tinkerpop.blueprints.Direction;
 import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
-import org.apache.falcon.entity.Storage;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.CatalogTable;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.feed.Location;
-import org.apache.falcon.entity.v0.feed.LocationType;
-import org.apache.falcon.entity.v0.feed.Locations;
-import org.apache.falcon.entity.v0.process.EngineType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.metadata.MetadataMappingService;
 import org.apache.falcon.metadata.RelationshipLabel;
 import org.apache.falcon.metadata.RelationshipProperty;
-import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.WorkflowExecutionArgs;
-import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
 import org.json.simple.JSONValue;
 import org.testng.Assert;
@@ -55,7 +39,6 @@ import java.io.File;
 import java.io.FileFilter;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -63,63 +46,18 @@ import java.util.Map;
  * Unit tests for org.apache.falcon.resource.metadata.LineageMetadataResource.
  */
 public class LineageMetadataResourceTest {
-    public static final String FALCON_USER = "falcon-user";
-    private static final String LOGS_DIR = "target/log";
-    private static final String NOMINAL_TIME = "2014-01-01-01-00";
-    public static final String OPERATION = "GENERATE";
-
-    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
-    public static final String PROCESS_ENTITY_NAME = "sample-process";
-    public static final String COLO_NAME = "west-coast";
-    public static final String WORKFLOW_NAME = "imp-click-join-workflow";
-    public static final String WORKFLOW_VERSION = "1.0.9";
-
-    public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
-    public static final String INPUT_INSTANCE_PATHS =
-            "jail://global:00/falcon/impression-feed/20140101#jail://global:00/falcon/clicks-feed/20140101";
-
-    public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
-    public static final String OUTPUT_INSTANCE_PATHS =
-            "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
-
-    private ConfigurationStore configStore;
-    private MetadataMappingService service;
-
-    private Cluster clusterEntity;
-    private List<Feed> inputFeeds = new ArrayList<Feed>();
-    private List<Feed> outputFeeds = new ArrayList<Feed>();
 
+    private MetadataTestContext testContext;
 
     @BeforeClass
     public void setUp() throws Exception {
-        CurrentUser.authenticate(FALCON_USER);
-
-        configStore = ConfigurationStore.get();
-
-        Services.get().register(new WorkflowJobEndNotificationService());
-        Assert.assertTrue(Services.get().isRegistered(WorkflowJobEndNotificationService.SERVICE_NAME));
-
-        StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
-        service = new MetadataMappingService();
-        service.init();
-        Services.get().register(service);
-        Assert.assertTrue(Services.get().isRegistered(MetadataMappingService.SERVICE_NAME));
-
-        addClusterEntity();
-        addFeedEntity();
-        addProcessEntity();
-        addInstance();
+        testContext = new MetadataTestContext();
+        testContext.setUp();
     }
 
     @AfterClass
     public void tearDown() throws Exception {
-        cleanupGraphStore(service.getGraph());
-        cleanupConfigurationStore(configStore);
-
-        service.destroy();
-
-        StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
-        Services.get().reset();
+        testContext.tearDown();
     }
 
     @Test (expectedExceptions = WebApplicationException.class)
@@ -132,10 +70,10 @@ public class LineageMetadataResourceTest {
 
     @Test
     public void testGetVerticesWithId() throws Exception {
-        Vertex vertex = service.getGraph().getVertices(RelationshipProperty.NAME.getName(),
-                PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z").iterator().next();
-
         LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(RelationshipProperty.NAME.getName(),
+                MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z").iterator().next();
+
         Response response = resource.getVertex(String.valueOf(vertex.getId()));
         Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
 
@@ -147,11 +85,11 @@ public class LineageMetadataResourceTest {
 
     @Test
     public void testGetVertexPropertiesForProcessInstance() throws Exception {
-        String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), processInstance).iterator().next();
 
-        LineageMetadataResource resource = new LineageMetadataResource();
         Response response = resource.getVertexProperties(String.valueOf(vertex.getId()), "true");
         Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
 
@@ -166,10 +104,10 @@ public class LineageMetadataResourceTest {
     @Test
     public void testGetVertexPropertiesForFeedInstance() throws Exception {
         String feedInstance = "impression-feed/2014-01-01T00:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), feedInstance).iterator().next();
 
-        LineageMetadataResource resource = new LineageMetadataResource();
         Response response = resource.getVertexProperties(String.valueOf(vertex.getId()), "true");
         Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
 
@@ -183,10 +121,10 @@ public class LineageMetadataResourceTest {
     @Test
     public void testGetVertexPropertiesForFeedInstanceNoTags() throws Exception {
         String feedInstance = "clicks-feed/2014-01-01T00:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), feedInstance).iterator().next();
 
-        LineageMetadataResource resource = new LineageMetadataResource();
         Response response = resource.getVertexProperties(String.valueOf(vertex.getId()), "true");
         Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
 
@@ -199,11 +137,11 @@ public class LineageMetadataResourceTest {
 
     @Test
     public void testGetVerticesWithKeyValue() throws Exception {
-        String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), processInstance).iterator().next();
 
-        LineageMetadataResource resource = new LineageMetadataResource();
         Response response = resource.getVertices(RelationshipProperty.NAME.getName(), processInstance);
         Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
 
@@ -227,14 +165,17 @@ public class LineageMetadataResourceTest {
 
     @Test
     public void testVertexEdgesForIdAndDirectionOut() throws Exception {
-        String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), processInstance).iterator().next();
         String vertexId = String.valueOf(vertex.getId());
 
-        int expectedSize = 6; // 2 output feeds, user, cluster, process entity, tag
-        List<String> expected = Arrays.asList(FALCON_USER, CLUSTER_ENTITY_NAME, "Critical",
-                PROCESS_ENTITY_NAME, "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
+        int expectedSize = 7; // 2 output feeds, user, cluster, process entity, tag, pipeline
+        List<String> expected = Arrays.asList(MetadataTestContext.FALCON_USER,
+                MetadataTestContext.CLUSTER_ENTITY_NAME, "Critical",
+                MetadataTestContext.PROCESS_ENTITY_NAME,
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z");
 
         verifyVertexEdges(vertexId, LineageMetadataResource.OUT, expectedSize, expected);
         verifyVertexEdgesCount(vertexId, LineageMetadataResource.OUT_COUNT, expectedSize);
@@ -244,8 +185,9 @@ public class LineageMetadataResourceTest {
 
     @Test
     public void testVertexEdgesForIdAndDirectionIn() throws Exception {
-        String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), processInstance).iterator().next();
         String vertexId = String.valueOf(vertex.getId());
 
@@ -260,14 +202,17 @@ public class LineageMetadataResourceTest {
 
     @Test
     public void testVertexEdgesForIdAndDirectionBoth() throws Exception {
-        String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), processInstance).iterator().next();
         String vertexId = String.valueOf(vertex.getId());
 
-        int expectedSize = 8;
-        List<String> expected = Arrays.asList(FALCON_USER, CLUSTER_ENTITY_NAME, "Critical",
-                PROCESS_ENTITY_NAME, "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z",
+        int expectedSize = 9;
+        List<String> expected = Arrays.asList(MetadataTestContext.FALCON_USER,
+                MetadataTestContext.CLUSTER_ENTITY_NAME, "Critical",
+                MetadataTestContext.PROCESS_ENTITY_NAME,
+                "imp-click-join1/2014-01-01T00:00Z", "imp-click-join2/2014-01-01T00:00Z",
                 "impression-feed/2014-01-01T00:00Z", "clicks-feed/2014-01-01T00:00Z");
 
         verifyVertexEdges(vertexId, LineageMetadataResource.BOTH, expectedSize, expected);
@@ -276,8 +221,6 @@ public class LineageMetadataResourceTest {
         verifyVertexEdgesCount(vertexId, LineageMetadataResource.BOTH_IDS, expectedSize);
     }
 
-
-
     @Test (expectedExceptions = WebApplicationException.class)
     public void testVertexEdgesForIdAndInvalidDirection() throws Exception {
         LineageMetadataResource resource = new LineageMetadataResource();
@@ -324,15 +267,15 @@ public class LineageMetadataResourceTest {
 
     @Test
     public void testEdgesById() throws Exception {
-        String processInstance = PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
-        Vertex vertex = service.getGraph().getVertices(
+        String processInstance = MetadataTestContext.PROCESS_ENTITY_NAME + "/2014-01-01T01:00Z";
+        LineageMetadataResource resource = new LineageMetadataResource();
+        Vertex vertex = resource.getGraph().getVertices(
                 RelationshipProperty.NAME.getName(), processInstance).iterator().next();
 
         Edge edge = vertex.getEdges(Direction.OUT,
                 RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()).iterator().next();
         Vertex toVertex = edge.getVertex(Direction.IN);
 
-        LineageMetadataResource resource = new LineageMetadataResource();
         Response response = resource.getEdge(String.valueOf(edge.getId()));
         Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
 
@@ -352,7 +295,7 @@ public class LineageMetadataResourceTest {
 
         Map results = (Map) JSONValue.parse(response.getEntity().toString());
         long totalSize = Long.valueOf(results.get(LineageMetadataResource.TOTAL_SIZE).toString());
-        Assert.assertEquals(totalSize, getVerticesCount(service.getGraph()));
+        Assert.assertEquals(totalSize, getVerticesCount(resource.getGraph()));
     }
 
     @Test
@@ -363,7 +306,7 @@ public class LineageMetadataResourceTest {
 
         Map results = (Map) JSONValue.parse(response.getEntity().toString());
         long totalSize = Long.valueOf(results.get(LineageMetadataResource.TOTAL_SIZE).toString());
-        Assert.assertEquals(totalSize, getEdgesCount(service.getGraph()));
+        Assert.assertEquals(totalSize, getEdgesCount(resource.getGraph()));
     }
 
     @Test (expectedExceptions = WebApplicationException.class)
@@ -406,7 +349,7 @@ public class LineageMetadataResourceTest {
             Assert.assertEquals(response.getEntity().toString(), "Lineage Metadata Service is not enabled.");
         } finally {
             Services.get().register(new WorkflowJobEndNotificationService());
-            Services.get().register(service);
+            Services.get().register(testContext.getService());
         }
     }
 
@@ -445,16 +388,16 @@ public class LineageMetadataResourceTest {
     }
 
     private void assertProcessInstanceRelationships(Map vertexProperties) {
-        Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), FALCON_USER);
+        Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), MetadataTestContext.FALCON_USER);
         Assert.assertEquals(vertexProperties.get(RelationshipLabel.PROCESS_CLUSTER_EDGE.getName()),
-                CLUSTER_ENTITY_NAME);
+                MetadataTestContext.CLUSTER_ENTITY_NAME);
         Assert.assertEquals(vertexProperties.get("classified-as"), "Critical");
     }
 
     private void assertFeedInstanceRelationships(Map vertexProperties, boolean skipRelationships) {
-        Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), FALCON_USER);
+        Assert.assertEquals(vertexProperties.get(RelationshipLabel.USER.getName()), MetadataTestContext.FALCON_USER);
         Assert.assertEquals(vertexProperties.get(RelationshipLabel.FEED_CLUSTER_EDGE.getName()),
-                CLUSTER_ENTITY_NAME);
+                MetadataTestContext.CLUSTER_ENTITY_NAME);
 
         if (!skipRelationships) {
             Assert.assertEquals(vertexProperties.get(RelationshipLabel.GROUPS.getName()), "analytics");
@@ -462,113 +405,6 @@ public class LineageMetadataResourceTest {
         }
     }
 
-    public void addClusterEntity() throws Exception {
-        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME,
-                COLO_NAME, "classification=production");
-        configStore.publish(EntityType.CLUSTER, clusterEntity);
-    }
-
-    public void addFeedEntity() throws Exception {
-        Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
-                "classified-as=Secure", "analytics");
-        addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
-        configStore.publish(EntityType.FEED, impressionsFeed);
-        inputFeeds.add(impressionsFeed);
-
-        Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity, null, null);
-        addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
-        configStore.publish(EntityType.FEED, clicksFeed);
-        inputFeeds.add(clicksFeed);
-
-        Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
-                "classified-as=Financial", "reporting,bi");
-        addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        configStore.publish(EntityType.FEED, join1Feed);
-        outputFeeds.add(join1Feed);
-
-        Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
-                "classified-as=Secure,classified-as=Financial", "reporting,bi");
-        addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
-        configStore.publish(EntityType.FEED, join2Feed);
-        outputFeeds.add(join2Feed);
-    }
-
-    public static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
-        if (storageType == Storage.TYPE.FILESYSTEM) {
-            Locations locations = new Locations();
-            feed.setLocations(locations);
-
-            Location location = new Location();
-            location.setType(LocationType.DATA);
-            location.setPath(uriTemplate);
-            feed.getLocations().getLocations().add(location);
-        } else {
-            CatalogTable table = new CatalogTable();
-            table.setUri(uriTemplate);
-            feed.setTable(table);
-        }
-    }
-
-    public void addProcessEntity() throws Exception {
-        Process processEntity = EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
-                clusterEntity, "classified-as=Critical");
-        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
-
-        for (Feed inputFeed : inputFeeds) {
-            EntityBuilderTestUtil.addInput(processEntity, inputFeed);
-        }
-
-        for (Feed outputFeed : outputFeeds) {
-            EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
-        }
-
-        configStore.publish(EntityType.PROCESS, processEntity);
-    }
-
-    public void addInstance() throws Exception {
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
-                WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
-    }
-
-    private static String[] getTestMessageArgs() {
-        return new String[]{
-            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
-            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
-            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
-            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
-            "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
-
-            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
-
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
-            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
-
-            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
-            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
-            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
-            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
-            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
-
-            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
-            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
-            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
-            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
-            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
-
-
-            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), "blah",
-            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
-            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), "blah",
-            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
-            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
-
-            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
-            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
-        };
-    }
-
     private long getVerticesCount(final Graph graph) {
         long count = 0;
         for (Vertex ignored : graph.getVertices()) {
@@ -586,25 +422,4 @@ public class LineageMetadataResourceTest {
 
         return count;
     }
-
-    private void cleanupGraphStore(Graph graph) {
-        for (Edge edge : graph.getEdges()) {
-            graph.removeEdge(edge);
-        }
-
-        for (Vertex vertex : graph.getVertices()) {
-            graph.removeVertex(vertex);
-        }
-
-        graph.shutdown();
-    }
-
-    private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception {
-        for (EntityType type : EntityType.values()) {
-            Collection<String> entities = store.getEntities(type);
-            for (String entity : entities) {
-                store.remove(type, entity);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
new file mode 100644
index 0000000..0198172
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataDiscoveryResourceTest.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import org.json.simple.JSONValue;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Unit tests for org.apache.falcon.resource.metadata.MetadataMappingResource.
+ */
+public class MetadataDiscoveryResourceTest {
+
+    private MetadataTestContext testContext;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        testContext = new MetadataTestContext();
+        testContext.setUp();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        testContext.tearDown();
+    }
+
+    @Test
+    public void testListDimensionsFeed() throws Exception {
+        MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+        Response response = resource.listDimensionValues("feed_entity", MetadataTestContext.CLUSTER_ENTITY_NAME);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        Map results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 4);
+        List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertTrue(dimensions.contains("impression-feed"));
+
+        response = resource.listDimensionValues("feed_entity", null);
+        results = (Map) JSONValue.parse(response.getEntity().toString());
+        dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 4);
+        Assert.assertTrue(dimensions.contains("impression-feed"));
+    }
+
+    @Test
+    public void testListDimensionsProcess() throws Exception {
+        MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+        Response response = resource.listDimensionValues("process_entity",
+                MetadataTestContext.CLUSTER_ENTITY_NAME);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        Map results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+        List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertEquals(dimensions.get(0), MetadataTestContext.PROCESS_ENTITY_NAME);
+
+        response = resource.listDimensionValues("process_entity", null);
+        results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+    }
+
+    @Test
+    public void testListDimensionsCluster() throws Exception {
+        MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+        Response response = resource.listDimensionValues("cluster_entity", MetadataTestContext.CLUSTER_ENTITY_NAME);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        Map results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+        List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertEquals(dimensions.get(0), MetadataTestContext.CLUSTER_ENTITY_NAME);
+
+        response = resource.listDimensionValues("cluster_entity", null);
+        results = (Map) JSONValue.parse(response.getEntity().toString());
+        dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+        Assert.assertEquals(dimensions.get(0), MetadataTestContext.CLUSTER_ENTITY_NAME);
+    }
+
+    @Test
+    public void testListDimensionsColo() throws Exception {
+        MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+        Response response = resource.listDimensionValues("colo", MetadataTestContext.CLUSTER_ENTITY_NAME);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        Map results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+        List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertEquals(dimensions.get(0), MetadataTestContext.COLO_NAME);
+
+        response = resource.listDimensionValues("colo", null);
+        results = (Map) JSONValue.parse(response.getEntity().toString());
+        dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+        Assert.assertEquals(dimensions.get(0), MetadataTestContext.COLO_NAME);
+
+        try {
+            resource.listDimensionValues("INVALID", null);
+            Assert.assertTrue(false);
+        } catch (Exception e) {
+            Assert.assertTrue(true);
+        }
+    }
+
+    @Test
+    public void testListDimensionsPipelines() throws Exception {
+        MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+        Response response = resource.listDimensionValues("pipelines", null);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        Map results = (Map) JSONValue.parse(response.getEntity().toString());
+        List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 1);
+        Assert.assertEquals(dimensions.get(0), "testPipeline");
+
+        response = resource.listDimensionValues("pipelines", MetadataTestContext.CLUSTER_ENTITY_NAME);
+        results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 0);
+    }
+
+    @Test
+    public void testListDimensionsTags() throws Exception {
+        MetadataDiscoveryResource resource = new MetadataDiscoveryResource();
+        Response response = resource.listDimensionValues("tags", null);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        Map results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 4);
+        List dimensions = (List) results.get(MetadataDiscoveryResource.RESULTS);
+        Assert.assertTrue(dimensions.contains("production"));
+
+        response = resource.listDimensionValues("tags", MetadataTestContext.CLUSTER_ENTITY_NAME);
+        results = (Map) JSONValue.parse(response.getEntity().toString());
+        Assert.assertEquals(Integer.parseInt(results.get(MetadataDiscoveryResource.TOTAL_SIZE).toString()), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
new file mode 100644
index 0000000..aaddf62
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/resource/metadata/MetadataTestContext.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource.metadata;
+
+import com.tinkerpop.blueprints.Edge;
+import com.tinkerpop.blueprints.Graph;
+import com.tinkerpop.blueprints.Vertex;
+import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
+import org.apache.falcon.entity.Storage;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.metadata.MetadataMappingService;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.service.Services;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
+import org.testng.Assert;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Base test class for Metadata Rest API.
+ */
+public class MetadataTestContext {
+    public static final String FALCON_USER = "falcon-user";
+    private static final String LOGS_DIR = "target/log";
+    private static final String NOMINAL_TIME = "2014-01-01-01-00";
+    public static final String OPERATION = "GENERATE";
+
+    public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
+    public static final String PROCESS_ENTITY_NAME = "sample-process";
+    public static final String COLO_NAME = "west-coast";
+    public static final String WORKFLOW_NAME = "imp-click-join-workflow";
+    public static final String WORKFLOW_VERSION = "1.0.9";
+
+    public static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
+    public static final String INPUT_INSTANCE_PATHS =
+            "jail://global:00/falcon/impression-feed/20140101#jail://global:00/falcon/clicks-feed/20140101";
+
+    public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
+    public static final String OUTPUT_INSTANCE_PATHS =
+            "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
+
+    private ConfigurationStore configStore;
+    private MetadataMappingService service;
+
+    private Cluster clusterEntity;
+    private List<Feed> inputFeeds = new ArrayList<Feed>();
+    private List<Feed> outputFeeds = new ArrayList<Feed>();
+
+    public MetadataTestContext() {}
+
+    public void setUp() throws Exception {
+        CurrentUser.authenticate(FALCON_USER);
+
+        configStore = ConfigurationStore.get();
+
+        Services.get().register(new WorkflowJobEndNotificationService());
+        Assert.assertTrue(Services.get().isRegistered(WorkflowJobEndNotificationService.SERVICE_NAME));
+
+        StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
+        service = new MetadataMappingService();
+        service.init();
+        Services.get().register(service);
+        Assert.assertTrue(Services.get().isRegistered(MetadataMappingService.SERVICE_NAME));
+
+        addClusterEntity();
+        addFeedEntity();
+        addProcessEntity();
+        addInstance();
+    }
+
+    public MetadataMappingService getService() {
+        return this.service;
+    }
+
+    public void tearDown() throws Exception {
+        cleanupGraphStore(service.getGraph());
+        cleanupConfigurationStore(configStore);
+
+        service.destroy();
+
+        StartupProperties.get().setProperty("falcon.graph.preserve.history", "false");
+        Services.get().reset();
+    }
+
+    public void addClusterEntity() throws Exception {
+        clusterEntity = EntityBuilderTestUtil.buildCluster(CLUSTER_ENTITY_NAME,
+                COLO_NAME, "classification=production");
+        configStore.publish(EntityType.CLUSTER, clusterEntity);
+    }
+
+    public void addFeedEntity() throws Exception {
+        Feed impressionsFeed = EntityBuilderTestUtil.buildFeed("impression-feed", clusterEntity,
+                "classified-as=Secure", "analytics");
+        addStorage(impressionsFeed, Storage.TYPE.FILESYSTEM, "/falcon/impression-feed/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, impressionsFeed);
+        inputFeeds.add(impressionsFeed);
+
+        Feed clicksFeed = EntityBuilderTestUtil.buildFeed("clicks-feed", clusterEntity, null, null);
+        addStorage(clicksFeed, Storage.TYPE.FILESYSTEM, "/falcon/clicks-feed/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, clicksFeed);
+        inputFeeds.add(clicksFeed);
+
+        Feed join1Feed = EntityBuilderTestUtil.buildFeed("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi");
+        addStorage(join1Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, join1Feed);
+        outputFeeds.add(join1Feed);
+
+        Feed join2Feed = EntityBuilderTestUtil.buildFeed("imp-click-join2", clusterEntity,
+                "classified-as=Secure,classified-as=Financial", "reporting,bi");
+        addStorage(join2Feed, Storage.TYPE.FILESYSTEM, "/falcon/imp-click-join2/${YEAR}${MONTH}${DAY}");
+        configStore.publish(EntityType.FEED, join2Feed);
+        outputFeeds.add(join2Feed);
+    }
+
+    public static void addStorage(Feed feed, Storage.TYPE storageType, String uriTemplate) {
+        if (storageType == Storage.TYPE.FILESYSTEM) {
+            Locations locations = new Locations();
+            feed.setLocations(locations);
+
+            Location location = new Location();
+            location.setType(LocationType.DATA);
+            location.setPath(uriTemplate);
+            feed.getLocations().getLocations().add(location);
+        } else {
+            CatalogTable table = new CatalogTable();
+            table.setUri(uriTemplate);
+            feed.setTable(table);
+        }
+    }
+
+    public void addProcessEntity() throws Exception {
+        org.apache.falcon.entity.v0.process.Process processEntity =
+                EntityBuilderTestUtil.buildProcess(PROCESS_ENTITY_NAME,
+                clusterEntity, "classified-as=Critical", "testPipeline");
+        EntityBuilderTestUtil.addProcessWorkflow(processEntity, WORKFLOW_NAME, WORKFLOW_VERSION);
+
+        for (Feed inputFeed : inputFeeds) {
+            EntityBuilderTestUtil.addInput(processEntity, inputFeed);
+        }
+
+        for (Feed outputFeed : outputFeeds) {
+            EntityBuilderTestUtil.addOutput(processEntity, outputFeed);
+        }
+
+        configStore.publish(EntityType.PROCESS, processEntity);
+    }
+
+    public void addInstance() throws Exception {
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+    }
+
+    private void cleanupGraphStore(Graph graph) {
+        for (Edge edge : graph.getEdges()) {
+            graph.removeEdge(edge);
+        }
+
+        for (Vertex vertex : graph.getVertices()) {
+            graph.removeVertex(vertex);
+        }
+
+        graph.shutdown();
+    }
+
+    private static void cleanupConfigurationStore(ConfigurationStore store) throws Exception {
+        for (EntityType type : EntityType.values()) {
+            Collection<String> entities = store.getEntities(type);
+            for (String entity : entities) {
+                store.remove(type, entity);
+            }
+        }
+    }
+
+    private static String[] getTestMessageArgs() {
+        return new String[]{
+            "-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), ("process"),
+            "-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), PROCESS_ENTITY_NAME,
+            "-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
+            "-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
+            "-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
+            "-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
+            "-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
+            "-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
+            "-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
+            "-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
+            "-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
+            "-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
+            "-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
+            "-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
+            "-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), "blah",
+            "-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), "blah",
+            "-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
+            "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
+            "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
+            "-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index caee676..31855bc 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -20,6 +20,7 @@ package org.apache.falcon.cli;
 
 import org.apache.commons.io.FilenameUtils;
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.metadata.RelationshipType;
 import org.apache.falcon.resource.TestContext;
 import org.apache.falcon.util.OozieTestUtils;
 import org.testng.Assert;
@@ -708,6 +709,42 @@ public class FalconCLIIT {
                 executeWithURL("instance -running -type process -name " + overlay.get("processName")));
     }
 
+    @Test
+    public void testMetadataListCommands() throws Exception {
+        TestContext context = new TestContext();
+        Map<String, String> overlay = context.getUniqueOverlay();
+        submitTestFiles(context, overlay);
+
+        String processName = overlay.get("processName");
+        String feedName = overlay.get("outputFeedName");
+        String clusterName = overlay.get("cluster");
+
+        Assert.assertEquals(0,
+                executeWithURL(FalconCLI.ENTITY_CMD + " -" + FalconCLI.SCHEDULE_OPT + " -"
+                        + FalconCLI.ENTITY_TYPE_OPT + " process  -" + FalconCLI.ENTITY_NAME_OPT + " " + processName));
+
+        Assert.assertEquals(0,
+                executeWithURL(FalconCLI.ENTITY_CMD + " -" + FalconCLI.SCHEDULE_OPT + " -"
+                        + FalconCLI.ENTITY_TYPE_OPT + " feed -" + FalconCLI.ENTITY_NAME_OPT + " " + feedName));
+
+        OozieTestUtils.waitForProcessWFtoStart(context);
+
+        String metadataListCommand = FalconCLI.METADATA_CMD + " -" + FalconMetadataCLI.LIST_OPT + " -"
+                + FalconMetadataCLI.TYPE_OPT + " ";
+        String clusterString = " -" + FalconMetadataCLI.CLUSTER_OPT + " " + clusterName;
+        Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.CLUSTER_ENTITY.toString()));
+        Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.PROCESS_ENTITY.toString()));
+        Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.FEED_ENTITY.toString()));
+        Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.PROCESS_ENTITY.toString()
+                + clusterString));
+        Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.FEED_ENTITY.toString()
+                + clusterString));
+        Assert.assertEquals(0, executeWithURL(metadataListCommand + RelationshipType.CLUSTER_ENTITY.toString()
+                + clusterString));
+
+        Assert.assertEquals(-1, executeWithURL(metadataListCommand + "feed"));
+        Assert.assertEquals(-1, executeWithURL(metadataListCommand + "invalid"));
+    }
 
     public void testContinue() throws Exception {
         TestContext context = new TestContext();

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
index 89132d6..0f2a971 100644
--- a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -54,7 +54,7 @@ public class PigProcessIT {
 
     @BeforeClass
     public void prepare() throws Exception {
-        TestContext.prepare(CLUSTER_TEMPLATE);
+        TestContext.prepare(CLUSTER_TEMPLATE, true);
 
         overlay = context.getUniqueOverlay();
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
index 91662d4..51afbb8 100644
--- a/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/TableStorageProcessIT.java
@@ -66,7 +66,7 @@ public class TableStorageProcessIT {
 
     @BeforeClass
     public void prepare() throws Exception {
-        TestContext.prepare(CLUSTER_TEMPLATE);
+        TestContext.prepare(CLUSTER_TEMPLATE, true);
 
         overlay = context.getUniqueOverlay();
         String filePath = TestContext.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/7f98570b/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java
new file mode 100644
index 0000000..462bcdc
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/MetadataResourceJerseyIT.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.resource;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.resource.metadata.AbstractMetadataResource;
+import org.json.simple.JSONValue;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test class for Metadata REST APIs.
+ *
+ * Tests should be enabled only in local environments as they need running instance of the web server.
+ */
+public class MetadataResourceJerseyIT {
+
+    private TestContext context;
+
+    @BeforeClass
+    public void prepare() throws Exception {
+        context = newContext();
+        ClientResponse response;
+        Map<String, String> overlay = context.getUniqueOverlay();
+
+        response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.FEED_TEMPLATE2, overlay, EntityType.FEED);
+        context.assertSuccessful(response);
+
+        response = context.submitToFalcon(TestContext.PROCESS_TEMPLATE, overlay, EntityType.PROCESS);
+        context.assertSuccessful(response);
+    }
+
+    @Test
+    public void testMetadataList() throws Exception {
+
+        ClientResponse response = context.service
+                .path("api/metadata/cluster_entity/list")
+                .header("Cookie", context.getAuthenticationToken())
+                .accept(MediaType.APPLICATION_JSON)
+                .get(ClientResponse.class);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        Map results = (Map) JSONValue.parse(response.getEntity(String.class));
+        List dimensions = (List) results.get(AbstractMetadataResource.RESULTS);
+        Assert.assertTrue(dimensions.contains(context.clusterName));
+
+        response = context.service
+                .path("api/metadata/process_entity/list")
+                .queryParam("cluster", context.clusterName)
+                .header("Cookie", context.getAuthenticationToken())
+                .accept(MediaType.APPLICATION_JSON)
+                .get(ClientResponse.class);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        results = (Map) JSONValue.parse(response.getEntity(String.class));
+        dimensions = (List) results.get(AbstractMetadataResource.RESULTS);
+        Assert.assertTrue(dimensions.contains(context.processName));
+
+        response = context.service
+                .path("api/metadata/process_entity/list")
+                .queryParam("cluster", "random")
+                .header("Cookie", context.getAuthenticationToken())
+                .accept(MediaType.APPLICATION_JSON)
+                .get(ClientResponse.class);
+        Assert.assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+        results = (Map) JSONValue.parse(response.getEntity(String.class));
+        Assert.assertEquals(((Long)results.get(AbstractMetadataResource.TOTAL_SIZE)).longValue(), 0);
+    }
+
+    private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>();
+
+    private TestContext newContext() throws Exception {
+        TestContext.prepare(TestContext.CLUSTER_TEMPLATE, false);
+        contexts.set(new TestContext());
+        return contexts.get();
+    }
+}